You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Gonzalo Herreros (JIRA)" <ji...@apache.org> on 2015/10/16 09:29:05 UTC

[jira] [Commented] (FLUME-2814) flume kafka sink does not write events to configured sink topic when source is also from other topic of kafka

    [ https://issues.apache.org/jira/browse/FLUME-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14960308#comment-14960308 ] 

Gonzalo Herreros commented on FLUME-2814:
-----------------------------------------

Having that header used in the sink is arguable useful but either it shouldn't be set by the source or it shouldn't apply if there is a topic explicitly chosen in the configuration.
Having the behavior your describe as the default is not good.

The way people are working around this is using an interceptor that changes or removes the header; or by using a kafka channel.

> flume kafka sink does not write events to configured sink topic when source is also from other topic of kafka
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2814
>                 URL: https://issues.apache.org/jira/browse/FLUME-2814
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.5.0
>            Reporter: Manohar
>
> I was testing a case when flume agent is reading from kafka source from topic 'sourcetopic' and sink configured to kafkasink but to other topic 'destinationtopic', 
> tier1.sources  = source1
> tier1.channels = channel1
> tier1.sinks    = sink1
> tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
> tier1.sources.source1.channels = channel1
> tier1.sources.source1.zookeeperConnect = localhost:2181
> tier1.sources.source1.topic = sourcetopic
> tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
> tier1.sinks.sink1.topic = destinationtopic
> tier1.sinks.sink1.brokerList = localhost:9092
> tier1.sinks.sink1.channel = channel1
> tier1.channels.channel1.type = memory
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 1000
> With this settings i noticed that event were not written to 'destinationtopic', 
> After debugging the agent if found that kafka source puts in topic name in header. 
> headers.put(KafkaSourceConstants.TOPIC, topic);
> and in sink check is made to see if headers contain topic, if exists then we take topic name from header and write event that topic and there by discarding configured sink topic i ,e destinationtopic.
> here is code snippet that does, even though variable topic as destinationtopic, since header had topic, kafka sink takes topic name from header and puts event to that topic i,e again to source topic
>         if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
>           eventTopic = topic;
>         }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)