You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Saurabh Sharma <sa...@nviz.com> on 2016/01/26 18:11:48 UTC

Flume 1.5.0 Kafka topic Sink Stuck With Channel Is Full Messages

Hi,

We are using flume1.6.0 to transfer the data from spool directory source to Kafka topic sink using memory channel. It started well and we could consume the data for few files.
But all of a sudden it started giving following issues with memory channel. We tried increasing memory channel capacity as well but no luck.

26 Jan 2016 22:29:36,555 INFO  [pool-3-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents:238)  - Last read was never committed - resetting mark position.
26 Jan 2016 22:29:39,600 WARN  [pool-3-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:239)  - The channel is full, and cannot write data now. The source will try again after 4000 milliseconds

We have following flume configuration for source, channel and sink. Could you please help us to figure out the issue.

agent.sources = spoolDir
agent.channels = memoryChannel
agent.sinks = sinks
agent.sources.spoolDir.interceptors = i1

#Source configuration
agent.sources.spoolDir.type = spooldir
agent.sources.spoolDir.spoolDir = D:/CommerceSense/spoolDir
agent.sources.spoolDir.fileHeader = true
agent.sources.spoolDir.basenameHeader = true
agent.sources.spoolDir.deserializer = LINE
agent.sources.spoolDir.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.spoolDir.interceptors.i1.preserveExisting = true
agent.sources.spoolDir.interceptors.i1.prefix = test
agent.sources.spoolDir.channels = memoryChannel


#Channel Configuration
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100000

#Sink Configuration
agent.sinks.sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.sink.topic = cdnLogsTopic
agent.sinks.sink.brokerList = localhost:9092
agent.sinks.sink.batchSize = 100
agent.sinks.sink.channel = memoryChannel


Thanks,
Saurabh