You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Pravesh Bhardwaj <pr...@gmail.com> on 2016/02/18 04:44:24 UTC

Control character stuffing when using Kafka Sink

Greetings,

I am trying to use Flume to read my source files (pipe delimited text
files) and feed them to kafka.

All of the plumbing seems to work fine and all the records are getting in
to kafka successfully. However, Flume seems to add NUL and STX control
characters at the start of each data line.

This is bit if an issue for me because eventually I am streaming all the
data from Kafka to Amazon S3 and finally into Amazon Redshift. Owing to
these control characters, my data loads to Redshift (COPY command) are
failing.

I have spent considerable time researching this problem on internet but no
luck so far. Can I somehow instruct Flume not to add these characters in my
source data?

My Flume configuration is given below. I have tried both "exec" and
"spoolDir" sources and it changed nothing.

kafka-agent.channels=ch1

kafka-agent.channels.ch1.type=org.apache.flume.channel.kafka.KafkaChannel
kafka-agent.channels.ch1.brokerList=localhost:9092
kafka-agent.channels.ch1.topic=call-center-dimension
kafka-agent.channels.ch1.zookeeperConnect=localhost:2181
kafka-agent.channels.ch1.capacity=10000
kafka-agent.channels.ch1.transactionCapacity=10000
kafka-agent.channels.ch1.parseAsFlumeEvent = true
kafka-agent.channels.ch1.kafka.serializer.class=kafka.serializer.DefaultEncoder

kafka-agent.sources=tail

#kafka-agent.sources.tail.type=spooldir
#kafka-agent.sources.tail.channels=ch1
#kafka-agent.sources.tail.spoolDir=/home/ec2-user/flumespool
#kafka-agent.sources.tail.fileHeader=false

kafka-agent.sources.tail.type=exec
kafka-agent.sources.tail.channels=ch1
kafka-agent.sources.tail.shell=/bin/bash -c
kafka-agent.sources.tail.command=cat /tmp/call_center_dimension_1.out

kafka-agent.sinks=sink1
kafka-agent.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
kafka-agent.sinks.sink1.brokerList=localhost:9092
kafka-agent.sinks.sink1.topic=kafka
kafka-agent.sinks.sink1.channel=ch1
kafka-agent.sinks.sink1.batchSize=5
kafka-agent.sinks.sink1.kafka.serializer.class=kafka.serializer.StringEncoder

Any help is appreciated.

Thanks