You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Muhammad Zeeshan Munir <mz...@gmail.com> on 2016/05/14 10:17:23 UTC

Unable to put batch - kafka.KafkaSource: KafkaSource EXCEPTION, {}

Hello,

I am receiving the following error from Kafka source to Elastic Search.
I have a virtual machine with 64 GB of RAM and I am running the flume agent
with 32 GB of heap size by using the following paramter -Xmx32000m.

I have tried experimenting different configurations by changing batch size
in kafka source and in elastic search sink. I have also tried changing
memory capacity, memory transaction capacity, byteSizeCapacity etc to
different values.

Everything runs fine for sometime and starts given except at some point of
time. Sometimes it lasts for few minutes, sometimes it last for hours and
sometimes it last for a day or so. Any help and advice will be highly
appreciated. Thank you in advance.

Configuration:

a4.sources = s1

a4.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

a4.sources.s1.zookeeperConnect = zk01:2181

a4.sources.s1.topic = topic1

a4.sources.s1.groupId = flume

a4.sources.s1.batchSize = 1000

a4.sources.s1.channels = c4

a4.sources.s1.consumer.timeout.ms = 1000

a4.sourcess1.backoffSleepIncrement=1000

#a4.sources.s1.batchDurationMillis=1000

a4.sources.s1.readSmallestOffset=false



#Sink

a4.sinks = k4

a4.sinks.k4.channel = c4

a4.sinks.k4.type=org.flume.sink.elasticsearch.ElasticSearchSink

a4.sinks.k4.client = transport

a4.sinks.k4.batchSize = 80000

a4.sinks.k4.hostNames = esnode:9300

a4.sinks.k4.indexName = index

a4.sinks.k4.indexType =  index-type

a4.sinks.k4.clusterName = es

a4.sinks.k4.serializer =
org.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

a4.sinks.k4.indexNameBuilder =
org.flume.sink.elasticsearch.TimeBasedIndexNameBuilder


a4.channels = c4

a4.channels.c4.type = memory

a4.channels.c4.capacity = 1000000

a4.channels.c4.transactionCapacity = 100000

a4.channels.c4.byteCapacityBufferPercentage = 20

a4.channels.c4.byteCapacity = 28000000000




*​Error:​*

16/05/14 13:05:17 ERROR kafka.KafkaSource: KafkaSource EXCEPTION, {}

org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: c4}

at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:132)

at
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:58)

at
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flume.ChannelFullException: Space for commit to queue
couldn't be acquired. Sinks are likely not keeping up with sources, or the
buffer size is too tight

at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)

at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)

at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

... 4 more