You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Muhammad Zeeshan Munir <mz...@gmail.com> on 2016/05/14 10:17:23 UTC
Unable to put batch - kafka.KafkaSource: KafkaSource EXCEPTION, {}
Hello,
I am receiving the following error from Kafka source to Elastic Search.
I have a virtual machine with 64 GB of RAM and I am running the flume agent
with 32 GB of heap size by using the following paramter -Xmx32000m.
I have tried experimenting different configurations by changing batch size
in kafka source and in elastic search sink. I have also tried changing
memory capacity, memory transaction capacity, byteSizeCapacity etc to
different values.
Everything runs fine for sometime and starts given except at some point of
time. Sometimes it lasts for few minutes, sometimes it last for hours and
sometimes it last for a day or so. Any help and advice will be highly
appreciated. Thank you in advance.
Configuration:
a4.sources = s1
a4.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a4.sources.s1.zookeeperConnect = zk01:2181
a4.sources.s1.topic = topic1
a4.sources.s1.groupId = flume
a4.sources.s1.batchSize = 1000
a4.sources.s1.channels = c4
a4.sources.s1.consumer.timeout.ms = 1000
a4.sourcess1.backoffSleepIncrement=1000
#a4.sources.s1.batchDurationMillis=1000
a4.sources.s1.readSmallestOffset=false
#Sink
a4.sinks = k4
a4.sinks.k4.channel = c4
a4.sinks.k4.type=org.flume.sink.elasticsearch.ElasticSearchSink
a4.sinks.k4.client = transport
a4.sinks.k4.batchSize = 80000
a4.sinks.k4.hostNames = esnode:9300
a4.sinks.k4.indexName = index
a4.sinks.k4.indexType = index-type
a4.sinks.k4.clusterName = es
a4.sinks.k4.serializer =
org.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
a4.sinks.k4.indexNameBuilder =
org.flume.sink.elasticsearch.TimeBasedIndexNameBuilder
a4.channels = c4
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000000
a4.channels.c4.transactionCapacity = 100000
a4.channels.c4.byteCapacityBufferPercentage = 20
a4.channels.c4.byteCapacity = 28000000000
*Error:*
16/05/14 13:05:17 ERROR kafka.KafkaSource: KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: c4}
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:132)
at
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:58)
at
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue
couldn't be acquired. Sinks are likely not keeping up with sources, or the
buffer size is too tight
at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 4 more