You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Brock Noland <br...@cloudera.com> on 2013/04/08 20:42:41 UTC
Re: : write-timeout value tuning
There is no harm in setting write-timeout to something like 30 seconds. In
fact it probably makes sense to increase the default to 30 seconds.
On Mon, Apr 8, 2013 at 1:38 PM, Madhu Gmail <ma...@gmail.com>wrote:
>
> Hello,****
>
> ** **
>
> I am getting below ERROR in flume agent(Acting as a collector) which is
> receiving log events from another flume agent.****
>
> ** **
>
> I have also copied my flume-conf.properties at the end of this mail.****
>
> Any idea how to tune write-timeout value. ****
>
> ** **
>
> ** **
>
> 2013-04-05 13:17:33,197 ERROR org.apache.flume.SinkRunner: Unable to
> deliver event. Exception follows.****
>
> org.apache.flume.ChannelException: Failed to obtain lock for writing to
> the log. Try increasing the log write timeout value. [channel=fc]****
>
> at
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:434)
> ****
>
> at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> ****
>
> at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:91)
> ****
>
> at
> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:189)****
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> ****
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>
> at java.lang.Thread.run(Thread.java:662)****
>
> 2013-04-05 13:17:33,427 INFO
> org.apache.flume.channel.file.EventQueueBackingStoreFile: Updating
> checkpoint metadata: logWriteOrderID: 1365169979081, queueSize: 0,
> queueHead: 362421****
>
> 2013-04-05 13:17:34,233 INFO org.apache.flume.channel.file.LogFileV3:
> Updating log-14.meta currentPosition = 3818784, logWriteOrderID =
> 1365169979081****
>
> 2013-04-05 13:17:34,294 INFO org.apache.flume.channel.file.Log: Updated
> checkpoint for file: /opt/sponge/flume/file-channel/dataDirs/log-14
> position: 3818784 logWriteOrderID: 1365169979081****
>
> 2013-04-05 13:17:34,294 DEBUG org.apache.flume.channel.file.Log: Rolling
> back 1365169950299****
>
> 2013-04-05 13:17:34,296 ERROR org.apache.flume.source.AvroSource: Avro
> source S1: Unable to process event batch. Exception follows.****
>
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: FileChannel fc { dataDirs:
> [/opt/sponge/flume/file-channel/dataDirs] }****
>
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> ****
>
> at
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:237)****
>
> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown
> Source)****
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> ****
>
> at java.lang.reflect.Method.invoke(Method.java:597)****
>
> at
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
> ****
>
> at
> org.apache.avro.ipc.Responder.respond(Responder.java:149)****
>
> at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
> ****
>
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
> ****
>
> at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
> ****
>
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> ****
>
> at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
> ****
>
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)***
> *
>
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
> ****
>
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
> ****
>
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
> ****
>
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
> ****
>
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> ****
>
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> ****
>
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)***
> *
>
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)***
> *
>
> at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)****
>
> at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> ****
>
> at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)****
>
> at
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> ****
>
> at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
> ****
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> ****
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> ****
>
> at java.lang.Thread.run(Thread.java:662)****
>
> Caused by: org.apache.flume.ChannelException: Failed to obtain lock for
> writing to the log. Try increasing the log write timeout value. [channel=fc]
> ****
>
> at
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:400)
> ****
>
> at
> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
> ****
>
> at
> org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:76)
> ****
>
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
> ****
>
> ... 28 more****
>
> 2013-04-05 13:17:34,296 DEBUG org.apache.flume.channel.file.Log: Files
> currently in use: [14]****
>
> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id:
> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] DISCONNECTED****
>
> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id:
> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] UNBOUND****
>
> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id:
> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] CLOSED****
>
> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: Connection
> to /10.42.202.131:42784 disconnected.****
>
> 2013-04-05 13:17:38,200 ERROR org.apache.flume.SinkRunner: Unable to
> deliver event. Exception follows.****
>
> java.lang.IllegalStateException: begin() called when transaction is OPEN!*
> ***
>
> at
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)***
> *
>
> at
> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
> ****
>
> at
> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> ****
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>
> at java.lang.Thread.run(Thread.java:662)****
>
> 2013-04-05 13:17:39,318 INFO org.apache.avro.ipc.NettyServer: [id:
> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] OPEN****
>
> 2013-04-05 13:17:43,202 ERROR org.apache.flume.SinkRunner: Unable to
> deliver event. Exception follows.****
>
> java.lang.IllegalStateException: begin() called when transaction is OPEN!*
> ***
>
> at
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)***
> *
>
> at
> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
> ****
>
> at
> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> ****
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>
> at java.lang.Thread.run(Thread.java:662)****
>
> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id:
> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] BOUND: /
> 10.96.172.44:1442****
>
> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id:
> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] CONNECTED: /
> 10.42.202.131:44085****
>
> 2013-04-05 13:17:45,854 DEBUG org.apache.flume.source.AvroSource: Avro
> source S1: Received avro event batch of 39 events.****
>
> 2013-04-05 13:17:45,958 DEBUG org.apache.flume.source.AvroSource: Avro
> source S1: Received avro event batch of 1 events.****
>
> 2013-04-05 13:17:48,499 DEBUG org.apache.zookeeper.ClientCnxn: Got ping
> response for sessionid: 0x53dca4664900059 after 0ms****
>
> 2013-04-05 13:17:50,854 ERROR org.apache.flume.SinkRunner: Unable to
> deliver event. Exception follows.****
>
> java.lang.IllegalStateException: begin() called when transaction is OPEN!*
> ***
>
> at
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)***
> *
>
> at
> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
> ****
>
> at
> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> ****
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>
> at java.lang.Thread.run(Thread.java:662)****
>
> 2013-04-05 13:17:55,856 ERROR org.apache.flume.SinkRunner: Unable to
> deliver event. Exception follows.****
>
> java.lang.IllegalStateException: begin() called when transaction is OPEN!*
> ***
>
> at
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)***
> *
>
> at
> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
> ****
>
> at
> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> ****
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>
> at java.lang.Thread.run(Thread.java:662)****
>
> ** **
>
> ** **
>
> ** **
>
> ==================================================================****
>
> col1.sources = S1****
>
> col1.channels = fc****
>
> col1.sinks = hBaseSink1****
>
> ** **
>
> ** **
>
> col1.sources.S1.type = avro****
>
> col1.sources.S1.bind=vm-15c2-3bbf****
>
> col1.sources.S1.port=1442****
>
> col1.sources.S1.channels = fc****
>
> ** **
>
> ** **
>
> # Each sink's type must be defined****
>
> col1.sinks.hBaseSink1.type = org.apache.flume.sink.hbase.HBaseSink****
>
> col1.sinks.hBaseSink1.table=elf_log****
>
> col1.sinks.hBaseSink1.columnFamily=content****
>
>
> col1.sinks.hBaseSink1.serializer=com.citi.sponge.flume.collector.sink.LogHbaseEventSerializer
> ****
>
> col1.sinks.hBaseSink1.timeout=120****
>
> col1.sinks.hBaseSink1.batchSize=20****
>
> ** **
>
> ** **
>
> #Specify the channel the sink should use****
>
> col1.sinks.hBaseSink1.channel = fc****
>
> ** **
>
> ** **
>
> # Each channel's type is defined.****
>
> col1.channels.fc.type = file****
>
> col1.channels.fc.checkpointDir = /opt/sponge/flume/file-channel/checkpoint
> ****
>
> col1.channels.fc.dataDirs = /opt/sponge/flume/file-channel/dataDirs****
>
> col1.channels.fc.transactionCapacity = 1000****
>
> col1.channels.fc.checkpointInterval = 30000****
>
> col1.channels.fc.maxFileSize = 2146435071****
>
> col1.channels.fc.minimumRequiredSpace = 524288000****
>
> col1.channels.fc.keep-alive = 5****
>
> col1.channels.fc.write-timeout = 10****
>
> col1.channels.fc.checkpoint-timeout = 600****
>
>
>
> Thanks
> Madhu
>
--
Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
Re: : write-timeout value tuning
Posted by Madhu Gmail <ma...@gmail.com>.
Thanks Brock.
Madhu Munagala
(214)679-2872
On Apr 8, 2013, at 2:15 PM, Brock Noland <br...@cloudera.com> wrote:
> The channel is checkpointing. Reducing the channels capacity or making whatever disk checkpointDir is on faster (dedicated disk, SSD, etc) would speed checkpointing.
>
>
> On Mon, Apr 8, 2013 at 2:12 PM, Madhu Gmail <ma...@gmail.com> wrote:
>> Thanks Brock.
>>
>> But curious to know which other property setting causes this write- timeout issue.
>>
>> I have sink batch size as 20 and no batch size on the source side of the collector. This agent acts as a collector for other flume agents which sends the events to the collector.
>>
>> Madhu Munagala
>> (214)679-2872
>>
>> On Apr 8, 2013, at 1:42 PM, Brock Noland <br...@cloudera.com> wrote:
>>
>>> There is no harm in setting write-timeout to something like 30 seconds. In fact it probably makes sense to increase the default to 30 seconds.
>>>
>>>
>>> On Mon, Apr 8, 2013 at 1:38 PM, Madhu Gmail <ma...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>>
>>>>>
>>>>> I am getting below ERROR in flume agent(Acting as a collector) which is receiving log events from another flume agent.
>>>>>
>>>>>
>>>>>
>>>>> I have also copied my flume-conf.properties at the end of this mail.
>>>>>
>>>>> Any idea how to tune write-timeout value.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2013-04-05 13:17:33,197 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>>>
>>>>> org.apache.flume.ChannelException: Failed to obtain lock for writing to the log. Try increasing the log write timeout value. [channel=fc]
>>>>>
>>>>> at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:434)
>>>>>
>>>>> at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>>>>>
>>>>> at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:91)
>>>>>
>>>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:189)
>>>>>
>>>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:662)
>>>>>
>>>>> 2013-04-05 13:17:33,427 INFO org.apache.flume.channel.file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1365169979081, queueSize: 0, queueHead: 362421
>>>>>
>>>>> 2013-04-05 13:17:34,233 INFO org.apache.flume.channel.file.LogFileV3: Updating log-14.meta currentPosition = 3818784, logWriteOrderID = 1365169979081
>>>>>
>>>>> 2013-04-05 13:17:34,294 INFO org.apache.flume.channel.file.Log: Updated checkpoint for file: /opt/sponge/flume/file-channel/dataDirs/log-14 position: 3818784 logWriteOrderID: 1365169979081
>>>>>
>>>>> 2013-04-05 13:17:34,294 DEBUG org.apache.flume.channel.file.Log: Rolling back 1365169950299
>>>>>
>>>>> 2013-04-05 13:17:34,296 ERROR org.apache.flume.source.AvroSource: Avro source S1: Unable to process event batch. Exception follows.
>>>>>
>>>>> org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel fc { dataDirs: [/opt/sponge/flume/file-channel/dataDirs] }
>>>>>
>>>>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>>>>>
>>>>> at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:237)
>>>>>
>>>>> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>>>>>
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>>>
>>>>> at java.lang.reflect.Method.invoke(Method.java:597)
>>>>>
>>>>> at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
>>>>>
>>>>> at org.apache.avro.ipc.Responder.respond(Responder.java:149)
>>>>>
>>>>> at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>>>>>
>>>>> at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
>>>>>
>>>>> at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>>>>>
>>>>> at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>>>>>
>>>>> at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
>>>>>
>>>>> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)
>>>>>
>>>>> at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>>>>>
>>>>> at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
>>>>>
>>>>> at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
>>>>>
>>>>> at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
>>>>>
>>>>> at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>>>>>
>>>>> at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>>>>>
>>>>> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
>>>>>
>>>>> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
>>>>>
>>>>> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
>>>>>
>>>>> at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
>>>>>
>>>>> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
>>>>>
>>>>> at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>>>>>
>>>>> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
>>>>>
>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>
>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:662)
>>>>>
>>>>> Caused by: org.apache.flume.ChannelException: Failed to obtain lock for writing to the log. Try increasing the log write timeout value. [channel=fc]
>>>>>
>>>>> at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:400)
>>>>>
>>>>> at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
>>>>>
>>>>> at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:76)
>>>>>
>>>>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
>>>>>
>>>>> ... 28 more
>>>>>
>>>>> 2013-04-05 13:17:34,296 DEBUG org.apache.flume.channel.file.Log: Files currently in use: [14]
>>>>>
>>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] DISCONNECTED
>>>>>
>>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] UNBOUND
>>>>>
>>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] CLOSED
>>>>>
>>>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: Connection to /10.42.202.131:42784 disconnected.
>>>>>
>>>>> 2013-04-05 13:17:38,200 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>>>
>>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>>>
>>>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>>>
>>>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>>>
>>>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>>>
>>>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:662)
>>>>>
>>>>> 2013-04-05 13:17:39,318 INFO org.apache.avro.ipc.NettyServer: [id: 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] OPEN
>>>>>
>>>>> 2013-04-05 13:17:43,202 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>>>
>>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>>>
>>>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>>>
>>>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>>>
>>>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>>>
>>>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:662)
>>>>>
>>>>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id: 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] BOUND: /10.96.172.44:1442
>>>>>
>>>>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id: 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] CONNECTED: /10.42.202.131:44085
>>>>>
>>>>> 2013-04-05 13:17:45,854 DEBUG org.apache.flume.source.AvroSource: Avro source S1: Received avro event batch of 39 events.
>>>>>
>>>>> 2013-04-05 13:17:45,958 DEBUG org.apache.flume.source.AvroSource: Avro source S1: Received avro event batch of 1 events.
>>>>>
>>>>> 2013-04-05 13:17:48,499 DEBUG org.apache.zookeeper.ClientCnxn: Got ping response for sessionid: 0x53dca4664900059 after 0ms
>>>>>
>>>>> 2013-04-05 13:17:50,854 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>>>
>>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>>>
>>>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>>>
>>>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>>>
>>>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>>>
>>>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:662)
>>>>>
>>>>> 2013-04-05 13:17:55,856 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>>>
>>>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>>>
>>>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>>>
>>>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>>>
>>>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>>>
>>>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:662)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ==================================================================
>>>>>
>>>>> col1.sources = S1
>>>>>
>>>>> col1.channels = fc
>>>>>
>>>>> col1.sinks = hBaseSink1
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> col1.sources.S1.type = avro
>>>>>
>>>>> col1.sources.S1.bind=vm-15c2-3bbf
>>>>>
>>>>> col1.sources.S1.port=1442
>>>>>
>>>>> col1.sources.S1.channels = fc
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> # Each sink's type must be defined
>>>>>
>>>>> col1.sinks.hBaseSink1.type = org.apache.flume.sink.hbase.HBaseSink
>>>>>
>>>>> col1.sinks.hBaseSink1.table=elf_log
>>>>>
>>>>> col1.sinks.hBaseSink1.columnFamily=content
>>>>>
>>>>> col1.sinks.hBaseSink1.serializer=com.citi.sponge.flume.collector.sink.LogHbaseEventSerializer
>>>>>
>>>>> col1.sinks.hBaseSink1.timeout=120
>>>>>
>>>>> col1.sinks.hBaseSink1.batchSize=20
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> #Specify the channel the sink should use
>>>>>
>>>>> col1.sinks.hBaseSink1.channel = fc
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> # Each channel's type is defined.
>>>>>
>>>>> col1.channels.fc.type = file
>>>>>
>>>>> col1.channels.fc.checkpointDir = /opt/sponge/flume/file-channel/checkpoint
>>>>>
>>>>> col1.channels.fc.dataDirs = /opt/sponge/flume/file-channel/dataDirs
>>>>>
>>>>> col1.channels.fc.transactionCapacity = 1000
>>>>>
>>>>> col1.channels.fc.checkpointInterval = 30000
>>>>>
>>>>> col1.channels.fc.maxFileSize = 2146435071
>>>>>
>>>>> col1.channels.fc.minimumRequiredSpace = 524288000
>>>>>
>>>>> col1.channels.fc.keep-alive = 5
>>>>>
>>>>> col1.channels.fc.write-timeout = 10
>>>>>
>>>>> col1.channels.fc.checkpoint-timeout = 600
>>>>>
>>>>
>>>>
>>>> Thanks
>>>> Madhu
>>>
>>>
>>>
>>> --
>>> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
Re: : write-timeout value tuning
Posted by Brock Noland <br...@cloudera.com>.
The channel is checkpointing. Reducing the channels capacity or making
whatever disk checkpointDir is on faster (dedicated disk, SSD, etc) would
speed checkpointing.
On Mon, Apr 8, 2013 at 2:12 PM, Madhu Gmail <ma...@gmail.com>wrote:
> Thanks Brock.
>
> But curious to know which other property setting causes this write-
> timeout issue.
>
> I have sink batch size as 20 and no batch size on the source side of the
> collector. This agent acts as a collector for other flume agents which
> sends the events to the collector.
>
> Madhu Munagala
> (214)679-2872
>
> On Apr 8, 2013, at 1:42 PM, Brock Noland <br...@cloudera.com> wrote:
>
> There is no harm in setting write-timeout to something like 30 seconds. In
> fact it probably makes sense to increase the default to 30 seconds.
>
>
> On Mon, Apr 8, 2013 at 1:38 PM, Madhu Gmail <ma...@gmail.com>wrote:
>
>>
>> Hello,****
>>
>> ** **
>>
>> I am getting below ERROR in flume agent(Acting as a collector) which is
>> receiving log events from another flume agent.****
>>
>> ** **
>>
>> I have also copied my flume-conf.properties at the end of this mail.****
>>
>> Any idea how to tune write-timeout value. ****
>>
>> ** **
>>
>> ** **
>>
>> 2013-04-05 13:17:33,197 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.****
>>
>> org.apache.flume.ChannelException: Failed to obtain lock for writing to
>> the log. Try increasing the log write timeout value. [channel=fc]****
>>
>> at
>> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:434)
>> ****
>>
>> at
>> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>> ****
>>
>> at
>> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:91)
>> ****
>>
>> at
>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:189)****
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> ****
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>>
>> at java.lang.Thread.run(Thread.java:662)****
>>
>> 2013-04-05 13:17:33,427 INFO
>> org.apache.flume.channel.file.EventQueueBackingStoreFile: Updating
>> checkpoint metadata: logWriteOrderID: 1365169979081, queueSize: 0,
>> queueHead: 362421****
>>
>> 2013-04-05 13:17:34,233 INFO org.apache.flume.channel.file.LogFileV3:
>> Updating log-14.meta currentPosition = 3818784, logWriteOrderID =
>> 1365169979081****
>>
>> 2013-04-05 13:17:34,294 INFO org.apache.flume.channel.file.Log: Updated
>> checkpoint for file: /opt/sponge/flume/file-channel/dataDirs/log-14
>> position: 3818784 logWriteOrderID: 1365169979081****
>>
>> 2013-04-05 13:17:34,294 DEBUG org.apache.flume.channel.file.Log: Rolling
>> back 1365169950299****
>>
>> 2013-04-05 13:17:34,296 ERROR org.apache.flume.source.AvroSource: Avro
>> source S1: Unable to process event batch. Exception follows.****
>>
>> org.apache.flume.ChannelException: Unable to put batch on required
>> channel: FileChannel fc { dataDirs:
>> [/opt/sponge/flume/file-channel/dataDirs] }****
>>
>> at
>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>> ****
>>
>> at
>> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:237)****
>>
>> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown
>> Source)****
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> ****
>>
>> at java.lang.reflect.Method.invoke(Method.java:597)****
>>
>> at
>> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
>> ****
>>
>> at
>> org.apache.avro.ipc.Responder.respond(Responder.java:149)****
>>
>> at
>> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>> ****
>>
>> at
>> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
>> ****
>>
>> at
>> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>> ****
>>
>> at
>> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>> ****
>>
>> at
>> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
>> ****
>>
>> at
>> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)**
>> **
>>
>> at
>> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>> ****
>>
>> at
>> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
>> ****
>>
>> at
>> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
>> ****
>>
>> at
>> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
>> ****
>>
>> at
>> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>> ****
>>
>> at
>> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>> ****
>>
>> at
>> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)**
>> **
>>
>> at
>> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)**
>> **
>>
>> at
>> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)****
>>
>> at
>> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
>> ****
>>
>> at
>> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)****
>>
>> at
>> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>> ****
>>
>> at
>> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
>> ****
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> ****
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> ****
>>
>> at java.lang.Thread.run(Thread.java:662)****
>>
>> Caused by: org.apache.flume.ChannelException: Failed to obtain lock for
>> writing to the log. Try increasing the log write timeout value. [channel=fc]
>> ****
>>
>> at
>> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:400)
>> ****
>>
>> at
>> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
>> ****
>>
>> at
>> org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:76)
>> ****
>>
>> at
>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
>> ****
>>
>> ... 28 more****
>>
>> 2013-04-05 13:17:34,296 DEBUG org.apache.flume.channel.file.Log: Files
>> currently in use: [14]****
>>
>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id:
>> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] DISCONNECTED****
>>
>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id:
>> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] UNBOUND****
>>
>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id:
>> 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] CLOSED****
>>
>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: Connection
>> to /10.42.202.131:42784 disconnected.****
>>
>> 2013-04-05 13:17:38,200 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.****
>>
>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>> ****
>>
>> at
>> com.google.common.base.Preconditions.checkState(Preconditions.java:145)**
>> **
>>
>> at
>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>> ****
>>
>> at
>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> ****
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>>
>> at java.lang.Thread.run(Thread.java:662)****
>>
>> 2013-04-05 13:17:39,318 INFO org.apache.avro.ipc.NettyServer: [id:
>> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] OPEN****
>>
>> 2013-04-05 13:17:43,202 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.****
>>
>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>> ****
>>
>> at
>> com.google.common.base.Preconditions.checkState(Preconditions.java:145)**
>> **
>>
>> at
>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>> ****
>>
>> at
>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> ****
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>>
>> at java.lang.Thread.run(Thread.java:662)****
>>
>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id:
>> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] BOUND: /
>> 10.96.172.44:1442****
>>
>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id:
>> 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] CONNECTED: /
>> 10.42.202.131:44085****
>>
>> 2013-04-05 13:17:45,854 DEBUG org.apache.flume.source.AvroSource: Avro
>> source S1: Received avro event batch of 39 events.****
>>
>> 2013-04-05 13:17:45,958 DEBUG org.apache.flume.source.AvroSource: Avro
>> source S1: Received avro event batch of 1 events.****
>>
>> 2013-04-05 13:17:48,499 DEBUG org.apache.zookeeper.ClientCnxn: Got ping
>> response for sessionid: 0x53dca4664900059 after 0ms****
>>
>> 2013-04-05 13:17:50,854 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.****
>>
>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>> ****
>>
>> at
>> com.google.common.base.Preconditions.checkState(Preconditions.java:145)**
>> **
>>
>> at
>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>> ****
>>
>> at
>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> ****
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>>
>> at java.lang.Thread.run(Thread.java:662)****
>>
>> 2013-04-05 13:17:55,856 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.****
>>
>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>> ****
>>
>> at
>> com.google.common.base.Preconditions.checkState(Preconditions.java:145)**
>> **
>>
>> at
>> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>> ****
>>
>> at
>> org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)****
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> ****
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)****
>>
>> at java.lang.Thread.run(Thread.java:662)****
>>
>> ** **
>>
>> ** **
>>
>> ** **
>>
>> ==================================================================****
>>
>> col1.sources = S1****
>>
>> col1.channels = fc****
>>
>> col1.sinks = hBaseSink1****
>>
>> ** **
>>
>> ** **
>>
>> col1.sources.S1.type = avro****
>>
>> col1.sources.S1.bind=vm-15c2-3bbf****
>>
>> col1.sources.S1.port=1442****
>>
>> col1.sources.S1.channels = fc****
>>
>> ** **
>>
>> ** **
>>
>> # Each sink's type must be defined****
>>
>> col1.sinks.hBaseSink1.type = org.apache.flume.sink.hbase.HBaseSink****
>>
>> col1.sinks.hBaseSink1.table=elf_log****
>>
>> col1.sinks.hBaseSink1.columnFamily=content****
>>
>>
>> col1.sinks.hBaseSink1.serializer=com.citi.sponge.flume.collector.sink.LogHbaseEventSerializer
>> ****
>>
>> col1.sinks.hBaseSink1.timeout=120****
>>
>> col1.sinks.hBaseSink1.batchSize=20****
>>
>> ** **
>>
>> ** **
>>
>> #Specify the channel the sink should use****
>>
>> col1.sinks.hBaseSink1.channel = fc****
>>
>> ** **
>>
>> ** **
>>
>> # Each channel's type is defined.****
>>
>> col1.channels.fc.type = file****
>>
>> col1.channels.fc.checkpointDir = /opt/sponge/flume/file-channel/checkpoint
>> ****
>>
>> col1.channels.fc.dataDirs = /opt/sponge/flume/file-channel/dataDirs****
>>
>> col1.channels.fc.transactionCapacity = 1000****
>>
>> col1.channels.fc.checkpointInterval = 30000****
>>
>> col1.channels.fc.maxFileSize = 2146435071****
>>
>> col1.channels.fc.minimumRequiredSpace = 524288000****
>>
>> col1.channels.fc.keep-alive = 5****
>>
>> col1.channels.fc.write-timeout = 10****
>>
>> col1.channels.fc.checkpoint-timeout = 600****
>>
>>
>>
>> Thanks
>> Madhu
>>
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
>
>
--
Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
Re: : write-timeout value tuning
Posted by Madhu Gmail <ma...@gmail.com>.
Thanks Brock.
But curious to know which other property setting causes this write- timeout issue.
I have sink batch size as 20 and no batch size on the source side of the collector. This agent acts as a collector for other flume agents which sends the events to the collector.
Madhu Munagala
(214)679-2872
On Apr 8, 2013, at 1:42 PM, Brock Noland <br...@cloudera.com> wrote:
> There is no harm in setting write-timeout to something like 30 seconds. In fact it probably makes sense to increase the default to 30 seconds.
>
>
> On Mon, Apr 8, 2013 at 1:38 PM, Madhu Gmail <ma...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> I am getting below ERROR in flume agent(Acting as a collector) which is receiving log events from another flume agent.
>>>
>>>
>>>
>>> I have also copied my flume-conf.properties at the end of this mail.
>>>
>>> Any idea how to tune write-timeout value.
>>>
>>>
>>>
>>>
>>>
>>> 2013-04-05 13:17:33,197 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>
>>> org.apache.flume.ChannelException: Failed to obtain lock for writing to the log. Try increasing the log write timeout value. [channel=fc]
>>>
>>> at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:434)
>>>
>>> at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>>>
>>> at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:91)
>>>
>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:189)
>>>
>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:662)
>>>
>>> 2013-04-05 13:17:33,427 INFO org.apache.flume.channel.file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1365169979081, queueSize: 0, queueHead: 362421
>>>
>>> 2013-04-05 13:17:34,233 INFO org.apache.flume.channel.file.LogFileV3: Updating log-14.meta currentPosition = 3818784, logWriteOrderID = 1365169979081
>>>
>>> 2013-04-05 13:17:34,294 INFO org.apache.flume.channel.file.Log: Updated checkpoint for file: /opt/sponge/flume/file-channel/dataDirs/log-14 position: 3818784 logWriteOrderID: 1365169979081
>>>
>>> 2013-04-05 13:17:34,294 DEBUG org.apache.flume.channel.file.Log: Rolling back 1365169950299
>>>
>>> 2013-04-05 13:17:34,296 ERROR org.apache.flume.source.AvroSource: Avro source S1: Unable to process event batch. Exception follows.
>>>
>>> org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel fc { dataDirs: [/opt/sponge/flume/file-channel/dataDirs] }
>>>
>>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>>>
>>> at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:237)
>>>
>>> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>>>
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:597)
>>>
>>> at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
>>>
>>> at org.apache.avro.ipc.Responder.respond(Responder.java:149)
>>>
>>> at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>>>
>>> at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
>>>
>>> at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>>>
>>> at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>>>
>>> at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
>>>
>>> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)
>>>
>>> at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>>>
>>> at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
>>>
>>> at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
>>>
>>> at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
>>>
>>> at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>>>
>>> at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>>>
>>> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
>>>
>>> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
>>>
>>> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
>>>
>>> at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
>>>
>>> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
>>>
>>> at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>>>
>>> at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>
>>> at java.lang.Thread.run(Thread.java:662)
>>>
>>> Caused by: org.apache.flume.ChannelException: Failed to obtain lock for writing to the log. Try increasing the log write timeout value. [channel=fc]
>>>
>>> at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:400)
>>>
>>> at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
>>>
>>> at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:76)
>>>
>>> at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
>>>
>>> ... 28 more
>>>
>>> 2013-04-05 13:17:34,296 DEBUG org.apache.flume.channel.file.Log: Files currently in use: [14]
>>>
>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] DISCONNECTED
>>>
>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] UNBOUND
>>>
>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: [id: 0x498b5a73, /10.42.202.131:42784 :> /10.96.172.44:1442] CLOSED
>>>
>>> 2013-04-05 13:17:34,305 INFO org.apache.avro.ipc.NettyServer: Connection to /10.42.202.131:42784 disconnected.
>>>
>>> 2013-04-05 13:17:38,200 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>
>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>
>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>
>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>
>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>
>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:662)
>>>
>>> 2013-04-05 13:17:39,318 INFO org.apache.avro.ipc.NettyServer: [id: 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] OPEN
>>>
>>> 2013-04-05 13:17:43,202 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>
>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>
>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>
>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>
>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>
>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:662)
>>>
>>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id: 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] BOUND: /10.96.172.44:1442
>>>
>>> 2013-04-05 13:17:45,853 INFO org.apache.avro.ipc.NettyServer: [id: 0x4e48f32a, /10.42.202.131:44085 => /10.96.172.44:1442] CONNECTED: /10.42.202.131:44085
>>>
>>> 2013-04-05 13:17:45,854 DEBUG org.apache.flume.source.AvroSource: Avro source S1: Received avro event batch of 39 events.
>>>
>>> 2013-04-05 13:17:45,958 DEBUG org.apache.flume.source.AvroSource: Avro source S1: Received avro event batch of 1 events.
>>>
>>> 2013-04-05 13:17:48,499 DEBUG org.apache.zookeeper.ClientCnxn: Got ping response for sessionid: 0x53dca4664900059 after 0ms
>>>
>>> 2013-04-05 13:17:50,854 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>
>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>
>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>
>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>
>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>
>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:662)
>>>
>>> 2013-04-05 13:17:55,856 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
>>>
>>> java.lang.IllegalStateException: begin() called when transaction is OPEN!
>>>
>>> at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>>>
>>> at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>>>
>>> at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:187)
>>>
>>> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:662)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> ==================================================================
>>>
>>> col1.sources = S1
>>>
>>> col1.channels = fc
>>>
>>> col1.sinks = hBaseSink1
>>>
>>>
>>>
>>>
>>>
>>> col1.sources.S1.type = avro
>>>
>>> col1.sources.S1.bind=vm-15c2-3bbf
>>>
>>> col1.sources.S1.port=1442
>>>
>>> col1.sources.S1.channels = fc
>>>
>>>
>>>
>>>
>>>
>>> # Each sink's type must be defined
>>>
>>> col1.sinks.hBaseSink1.type = org.apache.flume.sink.hbase.HBaseSink
>>>
>>> col1.sinks.hBaseSink1.table=elf_log
>>>
>>> col1.sinks.hBaseSink1.columnFamily=content
>>>
>>> col1.sinks.hBaseSink1.serializer=com.citi.sponge.flume.collector.sink.LogHbaseEventSerializer
>>>
>>> col1.sinks.hBaseSink1.timeout=120
>>>
>>> col1.sinks.hBaseSink1.batchSize=20
>>>
>>>
>>>
>>>
>>>
>>> #Specify the channel the sink should use
>>>
>>> col1.sinks.hBaseSink1.channel = fc
>>>
>>>
>>>
>>>
>>>
>>> # Each channel's type is defined.
>>>
>>> col1.channels.fc.type = file
>>>
>>> col1.channels.fc.checkpointDir = /opt/sponge/flume/file-channel/checkpoint
>>>
>>> col1.channels.fc.dataDirs = /opt/sponge/flume/file-channel/dataDirs
>>>
>>> col1.channels.fc.transactionCapacity = 1000
>>>
>>> col1.channels.fc.checkpointInterval = 30000
>>>
>>> col1.channels.fc.maxFileSize = 2146435071
>>>
>>> col1.channels.fc.minimumRequiredSpace = 524288000
>>>
>>> col1.channels.fc.keep-alive = 5
>>>
>>> col1.channels.fc.write-timeout = 10
>>>
>>> col1.channels.fc.checkpoint-timeout = 600
>>>
>>
>>
>> Thanks
>> Madhu
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org