You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by 周梦想 <ab...@gmail.com> on 2013/02/26 06:57:41 UTC

Take list for MemoryTransaction, capacity 100 full?

hello,
I using flume-ng send data from windows to linux hdfs through avro
protocol, and encountered this error:

2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
source userlogsrc: Received avro event batch of 100 events.

2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)]
process failed
org.apache.flume.ChannelException: Take list for MemoryTransaction,
capacity 100 full, consider committing more frequently, increasing
capacity, or increasing thread count
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
        at
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks are likely not keeping up with sources, or the
buffer size is too tight
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
        at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 28 more

I have set memory channel capacity to 1000, but it still report this error.
some one can give me any advice?

Thanks,
Andy

hdfs.conf:

agent46.sources = userlogsrc gamelogsrc
agent46.channels = memch1
agent46.sinks = myhdfssink

#channels:
agent46.channels.memch1.type = memory
agent46.channels.memch1.capacity = 10000
agent46.channels.memch1.transactionCapactiy = 100
#sources:
#userlogsrc:
#agent46.sources.userlogsrc.type = syslogTcp
agent46.sources.userlogsrc.type = avro
agent46.sources.userlogsrc.port = 5140
agent46.sources.userlogsrc.bind= 0.0.0.0
#agent46.sources.userlogsrc.host= hadoop48
agent46.sources.userlogsrc.interceptors = i1 i2 i3
agent46.sources.userlogsrc.interceptors.i1.type =
org.apache.flume.interceptor.HostInterceptor$Builder
agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
#agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
agent46.sources.userlogsrc.interceptors.i1.useIP = false
agent46.sources.userlogsrc.interceptors.i2.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder
agent46.sources.userlogsrc.interceptors.i3.type = static
agent46.sources.userlogsrc.interceptors.i3.key = datacenter
agent46.sources.userlogsrc.interceptors.i3.value = userdata
agent46.sources.userlogsrc.channels = memch1
#gamelogsrc:
#agent46.sources.gamelogsrc.type = syslogTcp
agent46.sources.gamelogsrc.type = avro
agent46.sources.gamelogsrc.port = 5150
agent46.sources.gamelogsrc.bind= 0.0.0.0
agent46.sources.gamelogsrc.channels = memch1
#sinks:
agent46.sinks.myhdfssink.channel = memch1

agent46.sinks.myhdfssink.type = hdfs
agent46.sinks.myhdfssink.hdfs.rollInterval = 120
agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
agent46.sinks.myhdfssink.hdfs.rollCount = 600000
agent46.sinks.myhdfssink.hdfs.batchSize = 1000
agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
agent46.sinks.myhdfssink.hdfs.path = hdfs://h46:9000/flume/%{filename}/%m%d
#agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
#agent46.sinks.myhdfssink.hdfs.filePrefix =
%{filename}.%{hostname}.%{datacenter}.%Y%m%d
agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
#agent46.sinks.myhdfssink.hdfs.rollInterval = 60
#agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
agent46.sinks.myhdfssink.hdfs.fileType = DataStream
#agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text

Re: Take list for MemoryTransaction, capacity 100 full?

Posted by 周梦想 <ab...@gmail.com>.
Hi Hari,
here is my main configure:
agent46.sources = userlogsrc gamelogsrc
agent46.channels = memch1
agent46.sinks = myhdfssink

 agent46.sinks.myhdfssink.hdfs.batchSize = 50
agent46.channels.memch1.type = memory
agent46.channels.memch1.capacity = 100000
agent46.channels.memch1.transactionCapactiy = 1000
agent46.sources.userlogsrc.type = avro
agent46.sources.gamelogsrc.type = avro

I have 8 avro sink to send data to agent46 sources, and evero avro sink's
batchsize is set to 40.
but it report the HDFS IO error and MemoryChannel exception.
HDFS IO error only appears a time. After that,it's about 2 minutes to
report ChannelException,and disconnect all agents and reconnect again.

It is caused by HDFS is too slow to flush? or other problem? How about the
configure relationship between source and channel and sink?

thank you!
Andy

below is some log:
2013-02-26 19:00:39,757 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[WARN -
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
HDFS IO error
java.io.IOException: Callable timed out after 10000 ms
        at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:352)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
Caused by: java.util.concurrent.TimeoutException
        at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258)
        at java.util.concurrent.FutureTask.get(FutureTask.java:119)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:345)
        ... 5 more

2013-02-26 19:00:39,758 (pool-8-thread-4) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source userlogsrc: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: memch1}
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
        at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)

2013-02-26 19:02:16,023 (pool-8-thread-4) [DEBUG -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
source userlogsrc: Received avro event batch of 40 events.
2013-02-26 19:02:23,602 (pool-8-thread-7) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source userlogsrc: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: memch1}
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
        at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at
org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
        at org.apache.avro.ipc.Responder.respond(Responder.java:149)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
        at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
        at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks are likely not keeping up with sources, or the
buffer size is too tight
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
        at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 28 more
2013-02-26 19:02:23,603 (pool-8-thread-9) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source userlogsrc: Unable to process event batch. Exception follows.
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at
org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
        at org.apache.avro.ipc.Responder.respond(Responder.java:149)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
        at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
        at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks are likely not keeping up with sources, or the
buffer size is too tight
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
        at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 28 more
2013-02-26 19:02:28,132 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x47e9e334, /192.168.133.44:17031 :> /192.168.133.46:5140] DISCONNECTED
2013-02-26 19:02:28,132 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x47e9e334, /192.168.133.44:17031 :> /192.168.133.46:5140] UNBOUND
2013-02-26 19:02:28,132 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x47e9e334, /192.168.133.44:17031 :> /192.168.133.46:5140] CLOSED
2013-02-26 19:02:28,133 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)]
Connection to /192.168.133.44:17031 disconnected.


2013/2/26 Hari Shreedharan <hs...@cloudera.com>

>  This is because the memory channel has a default transaction capacity of
> 100. Increasing it (or keeping sinks's batchSize < transaction capacity of
> the channel will fix the issue). See
> http://flume.apache.org/FlumeUserGuide.html#memory-channel for more
> details.
>
> Hari
>
> --
> Hari Shreedharan
>
> On Monday, February 25, 2013 at 11:41 PM, 周梦想 wrote:
>
> I found if agent46.sinks.myhdfssink.hdfs.batchSize >= 100, it will report
> this error.
> if I set this configure to 10, it's ok. but it's a bit slower.
>
> Best Regards,
> Andy
>
> 2013/2/26 周梦想 <ab...@gmail.com>
>
> more logs:
>
> 2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException:
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Take list for
> MemoryTransaction, capacity 100 full, consider committing more frequently,
> increasing capacity, or increasing thread count
>          at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>         ... 3 more
> 2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR -
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
> source userlogsrc: Unable to process event batch. Exception follows.
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: org.apache.flume.channel.MemoryChannel{name: memch1}
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>         at
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
>         at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
>         at org.apache.avro.ipc.Responder.respond(Responder.java:149)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>         at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>         ... 28 more
>
>
> 2013/2/26 周梦想 <ab...@gmail.com>
>
> hello,
> I using flume-ng send data from windows to linux hdfs through avro
> protocol, and encountered this error:
>
> 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG -
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
> source userlogsrc: Received avro event batch of 100 events.
>
> 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)]
> process failed
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>         ... 28 more
>
> I have set memory channel capacity to 1000, but it still report this error.
> some one can give me any advice?
>
> Thanks,
> Andy
>
> hdfs.conf:
>
> agent46.sources = userlogsrc gamelogsrc
> agent46.channels = memch1
> agent46.sinks = myhdfssink
>
> #channels:
> agent46.channels.memch1.type = memory
> agent46.channels.memch1.capacity = 10000
> agent46.channels.memch1.transactionCapactiy = 100
> #sources:
> #userlogsrc:
> #agent46.sources.userlogsrc.type = syslogTcp
> agent46.sources.userlogsrc.type = avro
> agent46.sources.userlogsrc.port = 5140
> agent46.sources.userlogsrc.bind= 0.0.0.0
> #agent46.sources.userlogsrc.host= hadoop48
> agent46.sources.userlogsrc.interceptors = i1 i2 i3
> agent46.sources.userlogsrc.interceptors.i1.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
> #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
> agent46.sources.userlogsrc.interceptors.i1.useIP = false
>  agent46.sources.userlogsrc.interceptors.i2.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder
> agent46.sources.userlogsrc.interceptors.i3.type = static
> agent46.sources.userlogsrc.interceptors.i3.key = datacenter
> agent46.sources.userlogsrc.interceptors.i3.value = userdata
> agent46.sources.userlogsrc.channels = memch1
> #gamelogsrc:
> #agent46.sources.gamelogsrc.type = syslogTcp
> agent46.sources.gamelogsrc.type = avro
> agent46.sources.gamelogsrc.port = 5150
> agent46.sources.gamelogsrc.bind= 0.0.0.0
> agent46.sources.gamelogsrc.channels = memch1
> #sinks:
> agent46.sinks.myhdfssink.channel = memch1
>
> agent46.sinks.myhdfssink.type = hdfs
> agent46.sinks.myhdfssink.hdfs.rollInterval = 120
> agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
> agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
> agent46.sinks.myhdfssink.hdfs.rollCount = 600000
> agent46.sinks.myhdfssink.hdfs.batchSize = 1000
> agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
> agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
> agent46.sinks.myhdfssink.hdfs.path = hdfs://h46:9000/flume/%{filename}/%m%d
> #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
> #agent46.sinks.myhdfssink.hdfs.filePrefix =
> %{filename}.%{hostname}.%{datacenter}.%Y%m%d
> agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
> #agent46.sinks.myhdfssink.hdfs.rollInterval = 60
> #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
> agent46.sinks.myhdfssink.hdfs.fileType = DataStream
> #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text
>
>
>
>
>
>

Re: Take list for MemoryTransaction, capacity 100full?

Posted by Hari Shreedharan <hs...@cloudera.com>.
This is because the memory channel has a default transaction capacity of 100. Increasing it (or keeping sinks's batchSize < transaction capacity of the channel will fix the issue). See http://flume.apache.org/FlumeUserGuide.html#memory-channel for more details.

Hari  

--  
Hari Shreedharan


On Monday, February 25, 2013 at 11:41 PM, 周梦想 wrote:

> I found if agent46.sinks.myhdfssink.hdfs.batchSize >= 100, it will report this error.
> if I set this configure to 10, it's ok. but it's a bit slower.
>  
> Best Regards,
> Andy
>  
> 2013/2/26 周梦想 <ablozhou@gmail.com (mailto:ablozhou@gmail.com)>
> > more logs:
> >  
> > 2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.  
> > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count
> >         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
> >         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> >         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> >         at java.lang.Thread.run(Thread.java:722)
> >  
> > Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count
> >         at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
> >         at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> >         at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> >         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
> >  
> >         ... 3 more
> > 2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro source userlogsrc: Unable to process event batch. Exception follows.
> > org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: memch1}
> >         at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> >         at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
> >         at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:601)
> >         at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
> >         at org.apache.avro.ipc.Responder.respond(Responder.java:149)
> >         at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
> >         at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
> >         at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
> >         at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> >         at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
> >         at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
> >         at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
> >         at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
> >         at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
> >         at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
> >         at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> >         at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> >         at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
> >         at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
> >         at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
> >         at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
> >         at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
> >         at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
> >         at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> >         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> >         at java.lang.Thread.run(Thread.java:722)
> > Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
> >         at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
> >         at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> >         at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> >         ... 28 more
> >  
> >  
> >  
> >  
> > 2013/2/26 周梦想 <ablozhou@gmail.com (mailto:ablozhou@gmail.com)>
> > > hello,
> > > I using flume-ng send data from windows to linux hdfs through avro protocol, and encountered this error:
> > >  
> > > 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro source userlogsrc: Received avro event batch of 100 events.  
> > >  
> > > 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)] process failed
> > > org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count
> > >         at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
> > >         at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> > >         at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> > >         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
> > >         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> > >         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > >         at java.lang.Thread.run(Thread.java:722)
> > >  
> > > Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
> > >         at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
> > >         at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> > >         at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> > >         ... 28 more
> > >  
> > >  
> > > I have set memory channel capacity to 1000, but it still report this error.  
> > > some one can give me any advice?
> > >  
> > > Thanks,
> > > Andy
> > >  
> > > hdfs.conf:
> > >  
> > > agent46.sources = userlogsrc gamelogsrc
> > > agent46.channels = memch1
> > > agent46.sinks = myhdfssink
> > >  
> > > #channels:
> > > agent46.channels.memch1.type = memory
> > > agent46.channels.memch1.capacity = 10000
> > > agent46.channels.memch1.transactionCapactiy = 100
> > > #sources:
> > > #userlogsrc:
> > > #agent46.sources.userlogsrc.type = syslogTcp
> > > agent46.sources.userlogsrc.type = avro
> > > agent46.sources.userlogsrc.port = 5140
> > > agent46.sources.userlogsrc.bind= 0.0.0.0
> > > #agent46.sources.userlogsrc.host= hadoop48
> > > agent46.sources.userlogsrc.interceptors = i1 i2 i3
> > > agent46.sources.userlogsrc.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
> > > agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
> > > #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
> > > agent46.sources.userlogsrc.interceptors.i1.useIP = false
> > > agent46.sources.userlogsrc.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
> > > agent46.sources.userlogsrc.interceptors.i3.type = static
> > > agent46.sources.userlogsrc.interceptors.i3.key = datacenter
> > > agent46.sources.userlogsrc.interceptors.i3.value = userdata
> > > agent46.sources.userlogsrc.channels = memch1
> > > #gamelogsrc:
> > > #agent46.sources.gamelogsrc.type = syslogTcp
> > > agent46.sources.gamelogsrc.type = avro
> > > agent46.sources.gamelogsrc.port = 5150
> > > agent46.sources.gamelogsrc.bind= 0.0.0.0
> > > agent46.sources.gamelogsrc.channels = memch1
> > >  
> > > #sinks:
> > > agent46.sinks.myhdfssink.channel = memch1
> > >  
> > > agent46.sinks.myhdfssink.type = hdfs
> > > agent46.sinks.myhdfssink.hdfs.rollInterval = 120
> > > agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
> > > agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
> > > agent46.sinks.myhdfssink.hdfs.rollCount = 600000
> > > agent46.sinks.myhdfssink.hdfs.batchSize = 1000
> > > agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
> > > agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
> > > agent46.sinks.myhdfssink.hdfs.path = hdfs://h46:9000/flume/%{filename}/%m%d
> > > #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
> > > #agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{hostname}.%{datacenter}.%Y%m%d
> > > agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
> > > #agent46.sinks.myhdfssink.hdfs.rollInterval = 60
> > > #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
> > > agent46.sinks.myhdfssink.hdfs.fileType = DataStream
> > > #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text
> > >  
> > >  
> > >  
> >  
>  


Re: Take list for MemoryTransaction, capacity 100 full?

Posted by 周梦想 <ab...@gmail.com>.
I found if agent46.sinks.myhdfssink.hdfs.batchSize >= 100, it will report
this error.
if I set this configure to 10, it's ok. but it's a bit slower.

Best Regards,
Andy

2013/2/26 周梦想 <ab...@gmail.com>

> more logs:
>
> 2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException:
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Take list for
> MemoryTransaction, capacity 100 full, consider committing more frequently,
> increasing capacity, or increasing thread count
>          at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>         ... 3 more
> 2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR -
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
> source userlogsrc: Unable to process event batch. Exception follows.
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: org.apache.flume.channel.MemoryChannel{name: memch1}
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>         at
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
>         at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
>         at org.apache.avro.ipc.Responder.respond(Responder.java:149)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>         at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>         ... 28 more
>
>
> 2013/2/26 周梦想 <ab...@gmail.com>
>
>> hello,
>> I using flume-ng send data from windows to linux hdfs through avro
>> protocol, and encountered this error:
>>
>> 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG -
>> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
>> source userlogsrc: Received avro event batch of 100 events.
>>
>> 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [ERROR -
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)]
>> process failed
>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>> capacity 100 full, consider committing more frequently, increasing
>> capacity, or increasing thread count
>>         at
>> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>>         at
>> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>>         at
>> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: org.apache.flume.ChannelException: Space for commit to queue
>> couldn't be acquired Sinks are likely not keeping up with sources, or the
>> buffer size is too tight
>>         at
>> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>>         at
>> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>>         at
>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>>         ... 28 more
>>
>> I have set memory channel capacity to 1000, but it still report this
>> error.
>> some one can give me any advice?
>>
>> Thanks,
>> Andy
>>
>> hdfs.conf:
>>
>> agent46.sources = userlogsrc gamelogsrc
>> agent46.channels = memch1
>> agent46.sinks = myhdfssink
>>
>> #channels:
>> agent46.channels.memch1.type = memory
>> agent46.channels.memch1.capacity = 10000
>> agent46.channels.memch1.transactionCapactiy = 100
>> #sources:
>> #userlogsrc:
>> #agent46.sources.userlogsrc.type = syslogTcp
>> agent46.sources.userlogsrc.type = avro
>> agent46.sources.userlogsrc.port = 5140
>> agent46.sources.userlogsrc.bind= 0.0.0.0
>> #agent46.sources.userlogsrc.host= hadoop48
>> agent46.sources.userlogsrc.interceptors = i1 i2 i3
>> agent46.sources.userlogsrc.interceptors.i1.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
>> #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
>> agent46.sources.userlogsrc.interceptors.i1.useIP = false
>>  agent46.sources.userlogsrc.interceptors.i2.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>> agent46.sources.userlogsrc.interceptors.i3.type = static
>> agent46.sources.userlogsrc.interceptors.i3.key = datacenter
>> agent46.sources.userlogsrc.interceptors.i3.value = userdata
>> agent46.sources.userlogsrc.channels = memch1
>> #gamelogsrc:
>> #agent46.sources.gamelogsrc.type = syslogTcp
>> agent46.sources.gamelogsrc.type = avro
>> agent46.sources.gamelogsrc.port = 5150
>> agent46.sources.gamelogsrc.bind= 0.0.0.0
>> agent46.sources.gamelogsrc.channels = memch1
>> #sinks:
>> agent46.sinks.myhdfssink.channel = memch1
>>
>> agent46.sinks.myhdfssink.type = hdfs
>> agent46.sinks.myhdfssink.hdfs.rollInterval = 120
>> agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
>> agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
>> agent46.sinks.myhdfssink.hdfs.rollCount = 600000
>> agent46.sinks.myhdfssink.hdfs.batchSize = 1000
>> agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
>> agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
>> agent46.sinks.myhdfssink.hdfs.path =
>> hdfs://h46:9000/flume/%{filename}/%m%d
>> #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
>> #agent46.sinks.myhdfssink.hdfs.filePrefix =
>> %{filename}.%{hostname}.%{datacenter}.%Y%m%d
>> agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
>> #agent46.sinks.myhdfssink.hdfs.rollInterval = 60
>> #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
>> agent46.sinks.myhdfssink.hdfs.fileType = DataStream
>> #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text
>>
>>
>>
>

Re: Take list for MemoryTransaction, capacity 100 full?

Posted by 周梦想 <ab...@gmail.com>.
more logs:

2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException:
Take list for MemoryTransaction, capacity 100 full, consider committing
more frequently, increasing capacity, or increasing thread count
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Take list for
MemoryTransaction, capacity 100 full, consider committing more frequently,
increasing capacity, or increasing thread count
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
        at
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
        ... 3 more
2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source userlogsrc: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: memch1}
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
        at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at
org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
        at org.apache.avro.ipc.Responder.respond(Responder.java:149)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
        at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
        at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks are likely not keeping up with sources, or the
buffer size is too tight
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
        at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 28 more


2013/2/26 周梦想 <ab...@gmail.com>

> hello,
> I using flume-ng send data from windows to linux hdfs through avro
> protocol, and encountered this error:
>
> 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG -
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
> source userlogsrc: Received avro event batch of 100 events.
>
> 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)]
> process failed
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>         ... 28 more
>
> I have set memory channel capacity to 1000, but it still report this error.
> some one can give me any advice?
>
> Thanks,
> Andy
>
> hdfs.conf:
>
> agent46.sources = userlogsrc gamelogsrc
> agent46.channels = memch1
> agent46.sinks = myhdfssink
>
> #channels:
> agent46.channels.memch1.type = memory
> agent46.channels.memch1.capacity = 10000
> agent46.channels.memch1.transactionCapactiy = 100
> #sources:
> #userlogsrc:
> #agent46.sources.userlogsrc.type = syslogTcp
> agent46.sources.userlogsrc.type = avro
> agent46.sources.userlogsrc.port = 5140
> agent46.sources.userlogsrc.bind= 0.0.0.0
> #agent46.sources.userlogsrc.host= hadoop48
> agent46.sources.userlogsrc.interceptors = i1 i2 i3
> agent46.sources.userlogsrc.interceptors.i1.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
> #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
> agent46.sources.userlogsrc.interceptors.i1.useIP = false
>  agent46.sources.userlogsrc.interceptors.i2.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder
> agent46.sources.userlogsrc.interceptors.i3.type = static
> agent46.sources.userlogsrc.interceptors.i3.key = datacenter
> agent46.sources.userlogsrc.interceptors.i3.value = userdata
> agent46.sources.userlogsrc.channels = memch1
> #gamelogsrc:
> #agent46.sources.gamelogsrc.type = syslogTcp
> agent46.sources.gamelogsrc.type = avro
> agent46.sources.gamelogsrc.port = 5150
> agent46.sources.gamelogsrc.bind= 0.0.0.0
> agent46.sources.gamelogsrc.channels = memch1
> #sinks:
> agent46.sinks.myhdfssink.channel = memch1
>
> agent46.sinks.myhdfssink.type = hdfs
> agent46.sinks.myhdfssink.hdfs.rollInterval = 120
> agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
> agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
> agent46.sinks.myhdfssink.hdfs.rollCount = 600000
> agent46.sinks.myhdfssink.hdfs.batchSize = 1000
> agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
> agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
> agent46.sinks.myhdfssink.hdfs.path = hdfs://h46:9000/flume/%{filename}/%m%d
> #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
> #agent46.sinks.myhdfssink.hdfs.filePrefix =
> %{filename}.%{hostname}.%{datacenter}.%Y%m%d
> agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
> #agent46.sinks.myhdfssink.hdfs.rollInterval = 60
> #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
> agent46.sinks.myhdfssink.hdfs.fileType = DataStream
> #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text
>
>
>