You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Mohit Durgapal <du...@gmail.com> on 2014/09/16 14:55:03 UTC

issue with failover sinks in flume

We have a two stage topology in flume in which we are in the first tier
adding headers based on hash value of a field in the event.
The hashing logic is added in the interceptor in Tier 1 of flume topology
which basically sets a header field. And then we use multiplexing to direct
events to Tier 2  based on that header field through selector.
 In the second tier we are storing the events locally using file_roll and
storing the same events in hdfs also.

Everything works fine when we are not using the failover sinks. When we add
the failover sink configuration in the first tier our hashing logic gets
overriden. That means even when all the machines in our Tier 2  are active
and running, some events which were meant for flume agent1(based on hashing
& multiplexing) go to agent 2.

Also we are performing this test on three machines. One machine for Tier 1(
lets say machine A) and two machines(lets say machine B & C) for Tier 2. In
Tier 2 for flume agent on machine B, the machine C acts as the failover
backup and for flume agent on machine C, the machine B acts as the failover
backup.

Any idea what could be wrong with this configuration?

Below are the tier wise configurations:

*Tier 1:*

agent1tier1.sources = tcpsrc
agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
avro-forward-ch02 avro-forward-ch02backup

agent1tier1.channels = channelbucket01 channelbucket02
agent1tier1.channels.channelbucket01.type = file
agent1tier1.channels.channelbucket02.type = file


agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
agent1tier1.sources.tcpsrc.type = syslogtcp
agent1tier1.sources.tcpsrc.port = 5149
agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
agent1tier1.sources.tcpsrc.interceptors=i1
agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp




#################### INTERCEPTOR ##############################
agent1tier1.sources.tcpsrc.interceptors=logsintercept
agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
bucket01,bucket02



#################### END OF INTERCEPTOR ##############################



####################### SELECTOR ###########################

agent1tier1.sources.tcpsrc.selector.type=multiplexing
agent1tier1.sources.tcpsrc.selector.header = bucket
agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
agent1tier1.sources.tcpsrc.selector.default = channelbucket01

##################### END OF SELECTOR #############################



#################### CHANNELS ##############################

agent1tier1.channels.channelbucket01.checkpointDir =
/home/flume/channelbucket01/file-channel/checkpoint
agent1tier1.channels.channelbucket01.dataDirs =
/home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data

agent1tier1.channels.channelbucket02.checkpointDir =
/home/flume/channelbucket02/file-channel/checkpoint
agent1tier1.channels.channelbucket02.dataDirs =
/home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data


#################### CHANNELS ##############################






################## CHANNELS CAPACITY ############################


agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
agent1tier1.channels.channelbucket01.checkpointInterval = 30000
agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
agent1tier1.channels.channelbucket01.capacity = 10000000

agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
agent1tier1.channels.channelbucket02.checkpointInterval = 30000
agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
agent1tier1.channels.channelbucket02.capacity = 10000000


################## END OF CHANNELS CAPACITY ############################



 # avro sink properties
agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01.type = avro
agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch01.port = 10000

 # avro sink properties
agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01backup.type = avro
agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch01backup.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02.type = avro
agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch02.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02backup.type = avro
agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch02backup.port = 10000



agent1tier1.sinkgroups = grpch1
agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
avro-forward-ch01backup
agent1tier1.sinkgroups.grpch1.processor.type = failover
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup =
10
agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000



agent1tier1.sinkgroups = grpch2
agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
avro-forward-ch02backup
agent1tier1.sinkgroups.grpch2.processor.type = failover
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup =
11
agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000



*Tier 2:*

tier2.sources  = avro-AppSrv-source
tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
tier2.channels = channelconv channelimp channelclk channelrt channelhdfsrt
channelhdfsdel
tier2.channels.channelimp.type = file
tier2.channels.channelconv.type = file
tier2.channels.channelclk.type = file
tier2.channels.channelrt.type = file
tier2.channels.channelhdfsrt.type = file
tier2.channels.channelhdfsdel.type = file

# For each source, channel, and sink, set
# standard properties.
# properties of avro-AppSrv-source
tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
channelclk channelrt channelhdfsrt channelhdfsdel
tier2.sources.avro-AppSrv-source.type = avro
tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
tier2.sources.avro-AppSrv-source.port = 10000






tier2.sources.avro-AppSrv-source.selector.type=multiplexing
tier2.sources.avro-AppSrv-source.selector.header = rectype
tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
channelhdfsdel
tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
channelhdfsdel
tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
channelhdfsdel

tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
channelhdfsrt


tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel



tier2.sinks.impsink.type = file_roll
tier2.sinks.impsink.channel = channelimp
tier2.sinks.impsink.sink.directory = /var/log/flume/imp
tier2.sinks.impsink.sink.rollInterval=60

tier2.sinks.convsink.type = file_roll
tier2.sinks.convsink.channel = channelconv
tier2.sinks.convsink.sink.directory = /var/log/flume/conv
tier2.sinks.convsink.sink.rollInterval=60

tier2.sinks.clksink.type = file_roll
tier2.sinks.clksink.channel = channelclk
tier2.sinks.clksink.sink.directory = /var/log/flume/clk
tier2.sinks.clksink.sink.rollInterval=60


tier2.sinks.rtsink.type = file_roll
tier2.sinks.rtsink.channel = channelrt
tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
tier2.sinks.rtsink.sink.rollInterval=60


#################### CHANNELS ##############################

tier2.channels.channelimp.checkpointDir =
/home/flume/channelimp/file-channel/checkpoint
tier2.channels.channelimp.dataDirs =
/home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data


tier2.channels.channelclk.checkpointDir =
/home/flume/channelclk/file-channel/checkpoint
tier2.channels.channelclk.dataDirs =
/home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data

tier2.channels.channelconv.checkpointDir =
/home/flume/channelconv/file-channel/checkpoint
tier2.channels.channelconv.dataDirs =
/home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data

tier2.channels.channelrt.checkpointDir =
/home/flume/channelrt/file-channel/checkpoint
tier2.channels.channelrt.dataDirs =
/home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data

tier2.channels.channelhdfsrt.checkpointDir =
/home/flume/channelhdfsrt/file-channel/checkpoint
tier2.channels.channelhdfsrt.dataDirs =
/home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data

tier2.channels.channelhdfsdel.checkpointDir =
/home/flume/channelhdfsdel/file-channel/checkpoint
tier2.channels.channelhdfsdel.dataDirs =
/home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data




#################### CHANNELS ##############################


tier2.sinks.hdfssinkrt.type = hdfs
tier2.sinks.hdfssinkrt.channel = channelhdfsrt
tier2.sinks.hdfssinkrt.hdfs.path =
hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
# Roll based on the block size only
tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
# seconds to wait before closing the file.
#tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
#tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000


tier2.sinks.hdfssinkdel.type = hdfs
tier2.sinks.hdfssinkdel.channel = channelhdfsdel
tier2.sinks.hdfssinkdel.hdfs.path =
hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
# Roll based on the block size only
tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
# seconds to wait before closing the file.
#tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
#tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
#################### END OF SINKS ##############################

Re: issue with failover sinks in flume

Posted by Mohit Durgapal <du...@gmail.com>.
Hi Hari,

Were you able to find if there's something wrong with the config ?


Regards
Mohit

On Thu, Sep 18, 2014 at 10:44 AM, Mohit Durgapal <du...@gmail.com>
wrote:

> Hi Hari,
>
> This is our latest config:
>
>
>
> agent1tier1.sources = tcpsrc
> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
> avro-forward-ch02 avro-forward-ch02backup
>
> agent1tier1.channels = channelbucket01 channelbucket02
> agent1tier1.channels.channelbucket01.type = file
> agent1tier1.channels.channelbucket02.type = file
>
>
> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
> agent1tier1.sources.tcpsrc.type = syslogtcp
> agent1tier1.sources.tcpsrc.port = 5149
> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
> agent1tier1.sources.tcpsrc.interceptors=i1
> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>
>
>
>
> #################### INTERCEPTOR ##############################
> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
> bucket01,bucket02
>
>
>
> #################### END OF INTERCEPTOR ##############################
>
>
>
> ####################### SELECTOR ###########################
>
> agent1tier1.sources.tcpsrc.selector.type=multiplexing
> agent1tier1.sources.tcpsrc.selector.header = bucket
> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>
> ##################### END OF SELECTOR #############################
>
>
>
> #################### CHANNELS ##############################
>
> agent1tier1.channels.channelbucket01.checkpointDir =
> /home/flume/channelbucket01/file-channel/checkpoint
> agent1tier1.channels.channelbucket01.dataDirs =
> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>
> agent1tier1.channels.channelbucket02.checkpointDir =
> /home/flume/channelbucket02/file-channel/checkpoint
> agent1tier1.channels.channelbucket02.dataDirs =
> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>
>
> #################### CHANNELS ##############################
>
>
>
>
>
>
> ################## CHANNELS CAPACITY ############################
>
>
> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket01.capacity = 10000000
>
> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket02.capacity = 10000000
>
>
> ################## END OF CHANNELS CAPACITY ############################
>
>
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01.type = avro
> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch01.port = 10000
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01backup.type = avro
> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02.type = avro
> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch02.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02backup.type = avro
> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>
>
>
> agent1tier1.sinkgroups = grpch1
> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
> avro-forward-ch01backup
> agent1tier1.sinkgroups.grpch1.processor.type = failover
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 10
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
> = 2
> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>
>
>
> agent1tier1.sinkgroups = grpch2
> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
> avro-forward-ch02backup
> agent1tier1.sinkgroups.grpch2.processor.type = failover
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 11
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
> = 1
> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>
>
> Regards
> Mohit
>
> On Thu, Sep 18, 2014 at 10:38 AM, Hari Shreedharan <
> hshreedharan@cloudera.com> wrote:
>
>> Can you send your latest config?
>>
>> Thanks,
>> Hari
>>
>>
>> On Tue, Sep 16, 2014 at 6:01 AM, Mohit Durgapal <du...@gmail.com>
>> wrote:
>>
>>>  We have a two stage topology in flume in which we are in the first
>>> tier adding headers based on hash value of a field in the event.
>>> The hashing logic is added in the interceptor in Tier 1 of flume
>>> topology which basically sets a header field. And then we use multiplexing
>>> to direct events to Tier 2  based on that header field through selector.
>>>  In the second tier we are storing the events locally using file_roll
>>> and storing the same events in hdfs also.
>>>
>>> Everything works fine when we are not using the failover sinks. When we
>>> add the failover sink configuration in the first tier our hashing logic
>>> gets overriden. That means even when all the machines in our Tier 2  are
>>> active and running, some events which were meant for flume agent1(based on
>>> hashing & multiplexing) go to agent 2.
>>>
>>> Also we are performing this test on three machines. One machine for Tier
>>> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
>>> In Tier 2 for flume agent on machine B, the machine C acts as the failover
>>> backup and for flume agent on machine C, the machine B acts as the failover
>>> backup.
>>>
>>> Any idea what could be wrong with this configuration?
>>>
>>> Below are the tier wise configurations:
>>>
>>> *Tier 1:*
>>>
>>> agent1tier1.sources = tcpsrc
>>> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
>>> avro-forward-ch02 avro-forward-ch02backup
>>>
>>> agent1tier1.channels = channelbucket01 channelbucket02
>>> agent1tier1.channels.channelbucket01.type = file
>>> agent1tier1.channels.channelbucket02.type = file
>>>
>>>
>>> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
>>> agent1tier1.sources.tcpsrc.type = syslogtcp
>>> agent1tier1.sources.tcpsrc.port = 5149
>>> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
>>> agent1tier1.sources.tcpsrc.interceptors=i1
>>> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>>>
>>>
>>>
>>>
>>> #################### INTERCEPTOR ##############################
>>> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>>>
>>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
>>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
>>> bucket01,bucket02
>>>
>>>
>>>
>>> #################### END OF INTERCEPTOR ##############################
>>>
>>>
>>>
>>> ####################### SELECTOR ###########################
>>>
>>> agent1tier1.sources.tcpsrc.selector.type=multiplexing
>>> agent1tier1.sources.tcpsrc.selector.header = bucket
>>> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
>>> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
>>> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>>>
>>> ##################### END OF SELECTOR #############################
>>>
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>> agent1tier1.channels.channelbucket01.checkpointDir =
>>> /home/flume/channelbucket01/file-channel/checkpoint
>>> agent1tier1.channels.channelbucket01.dataDirs =
>>> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>>>
>>> agent1tier1.channels.channelbucket02.checkpointDir =
>>> /home/flume/channelbucket02/file-channel/checkpoint
>>> agent1tier1.channels.channelbucket02.dataDirs =
>>> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>>
>>>
>>>
>>>
>>>
>>> ################## CHANNELS CAPACITY ############################
>>>
>>>
>>> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
>>> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
>>> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
>>> agent1tier1.channels.channelbucket01.capacity = 10000000
>>>
>>> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
>>> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
>>> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
>>> agent1tier1.channels.channelbucket02.capacity = 10000000
>>>
>>>
>>> ################## END OF CHANNELS CAPACITY ############################
>>>
>>>
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
>>> agent1tier1.sinks.avro-forward-ch01.type = avro
>>> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
>>> agent1tier1.sinks.avro-forward-ch01.port = 10000
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
>>> agent1tier1.sinks.avro-forward-ch01backup.type = avro
>>> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
>>> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
>>> agent1tier1.sinks.avro-forward-ch02.type = avro
>>> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
>>> agent1tier1.sinks.avro-forward-ch02.port = 19999
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
>>> agent1tier1.sinks.avro-forward-ch02backup.type = avro
>>> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
>>> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>>>
>>>
>>>
>>> agent1tier1.sinkgroups = grpch1
>>> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
>>> avro-forward-ch01backup
>>> agent1tier1.sinkgroups.grpch1.processor.type = failover
>>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
>>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
>>> = 10
>>> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>>>
>>>
>>>
>>> agent1tier1.sinkgroups = grpch2
>>> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
>>> avro-forward-ch02backup
>>> agent1tier1.sinkgroups.grpch2.processor.type = failover
>>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
>>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
>>> = 11
>>> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>>>
>>>
>>>
>>> *Tier 2:*
>>>
>>> tier2.sources  = avro-AppSrv-source
>>> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
>>> tier2.channels = channelconv channelimp channelclk channelrt
>>> channelhdfsrt channelhdfsdel
>>> tier2.channels.channelimp.type = file
>>> tier2.channels.channelconv.type = file
>>> tier2.channels.channelclk.type = file
>>> tier2.channels.channelrt.type = file
>>> tier2.channels.channelhdfsrt.type = file
>>> tier2.channels.channelhdfsdel.type = file
>>>
>>> # For each source, channel, and sink, set
>>> # standard properties.
>>> # properties of avro-AppSrv-source
>>> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
>>> channelclk channelrt channelhdfsrt channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.type = avro
>>> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
>>> tier2.sources.avro-AppSrv-source.port = 10000
>>>
>>>
>>>
>>>
>>>
>>>
>>> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
>>> tier2.sources.avro-AppSrv-source.selector.header = rectype
>>> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
>>> channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
>>> channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
>>> channelhdfsdel
>>>
>>> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
>>> channelhdfsrt
>>>
>>>
>>> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>>>
>>>
>>>
>>> tier2.sinks.impsink.type = file_roll
>>> tier2.sinks.impsink.channel = channelimp
>>> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
>>> tier2.sinks.impsink.sink.rollInterval=60
>>>
>>> tier2.sinks.convsink.type = file_roll
>>> tier2.sinks.convsink.channel = channelconv
>>> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
>>> tier2.sinks.convsink.sink.rollInterval=60
>>>
>>> tier2.sinks.clksink.type = file_roll
>>> tier2.sinks.clksink.channel = channelclk
>>> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
>>> tier2.sinks.clksink.sink.rollInterval=60
>>>
>>>
>>> tier2.sinks.rtsink.type = file_roll
>>> tier2.sinks.rtsink.channel = channelrt
>>> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
>>> tier2.sinks.rtsink.sink.rollInterval=60
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>> tier2.channels.channelimp.checkpointDir =
>>> /home/flume/channelimp/file-channel/checkpoint
>>> tier2.channels.channelimp.dataDirs =
>>> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>>>
>>>
>>> tier2.channels.channelclk.checkpointDir =
>>> /home/flume/channelclk/file-channel/checkpoint
>>> tier2.channels.channelclk.dataDirs =
>>> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>>>
>>> tier2.channels.channelconv.checkpointDir =
>>> /home/flume/channelconv/file-channel/checkpoint
>>> tier2.channels.channelconv.dataDirs =
>>> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>>>
>>> tier2.channels.channelrt.checkpointDir =
>>> /home/flume/channelrt/file-channel/checkpoint
>>> tier2.channels.channelrt.dataDirs =
>>> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>>>
>>> tier2.channels.channelhdfsrt.checkpointDir =
>>> /home/flume/channelhdfsrt/file-channel/checkpoint
>>> tier2.channels.channelhdfsrt.dataDirs =
>>> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>>>
>>> tier2.channels.channelhdfsdel.checkpointDir =
>>> /home/flume/channelhdfsdel/file-channel/checkpoint
>>> tier2.channels.channelhdfsdel.dataDirs =
>>> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>>>
>>>
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>>
>>> tier2.sinks.hdfssinkrt.type = hdfs
>>> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
>>> tier2.sinks.hdfssinkrt.hdfs.path =
>>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
>>> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
>>> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
>>> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
>>> # Roll based on the block size only
>>> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
>>> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
>>> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
>>> # seconds to wait before closing the file.
>>> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
>>> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
>>> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
>>> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
>>> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
>>> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>>>
>>>
>>> tier2.sinks.hdfssinkdel.type = hdfs
>>> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
>>> tier2.sinks.hdfssinkdel.hdfs.path =
>>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
>>> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
>>> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
>>> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
>>> # Roll based on the block size only
>>> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
>>> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
>>> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
>>> # seconds to wait before closing the file.
>>> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
>>> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
>>> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
>>> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
>>> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
>>> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
>>> #################### END OF SINKS ##############################
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: issue with failover sinks in flume

Posted by Mohit Durgapal <du...@gmail.com>.
Hi Hari,

This is our latest config:


agent1tier1.sources = tcpsrc
agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
avro-forward-ch02 avro-forward-ch02backup

agent1tier1.channels = channelbucket01 channelbucket02
agent1tier1.channels.channelbucket01.type = file
agent1tier1.channels.channelbucket02.type = file


agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
agent1tier1.sources.tcpsrc.type = syslogtcp
agent1tier1.sources.tcpsrc.port = 5149
agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
agent1tier1.sources.tcpsrc.interceptors=i1
agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp




#################### INTERCEPTOR ##############################
agent1tier1.sources.tcpsrc.interceptors=logsintercept
agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
bucket01,bucket02



#################### END OF INTERCEPTOR ##############################



####################### SELECTOR ###########################

agent1tier1.sources.tcpsrc.selector.type=multiplexing
agent1tier1.sources.tcpsrc.selector.header = bucket
agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
agent1tier1.sources.tcpsrc.selector.default = channelbucket01

##################### END OF SELECTOR #############################



#################### CHANNELS ##############################

agent1tier1.channels.channelbucket01.checkpointDir =
/home/flume/channelbucket01/file-channel/checkpoint
agent1tier1.channels.channelbucket01.dataDirs =
/home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data

agent1tier1.channels.channelbucket02.checkpointDir =
/home/flume/channelbucket02/file-channel/checkpoint
agent1tier1.channels.channelbucket02.dataDirs =
/home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data


#################### CHANNELS ##############################






################## CHANNELS CAPACITY ############################


agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
agent1tier1.channels.channelbucket01.checkpointInterval = 30000
agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
agent1tier1.channels.channelbucket01.capacity = 10000000

agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
agent1tier1.channels.channelbucket02.checkpointInterval = 30000
agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
agent1tier1.channels.channelbucket02.capacity = 10000000


################## END OF CHANNELS CAPACITY ############################



 # avro sink properties
agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01.type = avro
agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch01.port = 10000

 # avro sink properties
agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01backup.type = avro
agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch01backup.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02.type = avro
agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch02.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02backup.type = avro
agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch02backup.port = 10000



agent1tier1.sinkgroups = grpch1
agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
avro-forward-ch01backup
agent1tier1.sinkgroups.grpch1.processor.type = failover
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 10
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup =
2
agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000



agent1tier1.sinkgroups = grpch2
agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
avro-forward-ch02backup
agent1tier1.sinkgroups.grpch2.processor.type = failover
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 11
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup =
1
agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000


Regards
Mohit

On Thu, Sep 18, 2014 at 10:38 AM, Hari Shreedharan <
hshreedharan@cloudera.com> wrote:

> Can you send your latest config?
>
> Thanks,
> Hari
>
>
> On Tue, Sep 16, 2014 at 6:01 AM, Mohit Durgapal <du...@gmail.com>
> wrote:
>
>>  We have a two stage topology in flume in which we are in the first tier
>> adding headers based on hash value of a field in the event.
>> The hashing logic is added in the interceptor in Tier 1 of flume topology
>> which basically sets a header field. And then we use multiplexing to direct
>> events to Tier 2  based on that header field through selector.
>>  In the second tier we are storing the events locally using file_roll and
>> storing the same events in hdfs also.
>>
>> Everything works fine when we are not using the failover sinks. When we
>> add the failover sink configuration in the first tier our hashing logic
>> gets overriden. That means even when all the machines in our Tier 2  are
>> active and running, some events which were meant for flume agent1(based on
>> hashing & multiplexing) go to agent 2.
>>
>> Also we are performing this test on three machines. One machine for Tier
>> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
>> In Tier 2 for flume agent on machine B, the machine C acts as the failover
>> backup and for flume agent on machine C, the machine B acts as the failover
>> backup.
>>
>> Any idea what could be wrong with this configuration?
>>
>> Below are the tier wise configurations:
>>
>> *Tier 1:*
>>
>> agent1tier1.sources = tcpsrc
>> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
>> avro-forward-ch02 avro-forward-ch02backup
>>
>> agent1tier1.channels = channelbucket01 channelbucket02
>> agent1tier1.channels.channelbucket01.type = file
>> agent1tier1.channels.channelbucket02.type = file
>>
>>
>> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
>> agent1tier1.sources.tcpsrc.type = syslogtcp
>> agent1tier1.sources.tcpsrc.port = 5149
>> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
>> agent1tier1.sources.tcpsrc.interceptors=i1
>> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>>
>>
>>
>>
>> #################### INTERCEPTOR ##############################
>> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>>
>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
>> bucket01,bucket02
>>
>>
>>
>> #################### END OF INTERCEPTOR ##############################
>>
>>
>>
>> ####################### SELECTOR ###########################
>>
>> agent1tier1.sources.tcpsrc.selector.type=multiplexing
>> agent1tier1.sources.tcpsrc.selector.header = bucket
>> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
>> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
>> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>>
>> ##################### END OF SELECTOR #############################
>>
>>
>>
>> #################### CHANNELS ##############################
>>
>> agent1tier1.channels.channelbucket01.checkpointDir =
>> /home/flume/channelbucket01/file-channel/checkpoint
>> agent1tier1.channels.channelbucket01.dataDirs =
>> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>>
>> agent1tier1.channels.channelbucket02.checkpointDir =
>> /home/flume/channelbucket02/file-channel/checkpoint
>> agent1tier1.channels.channelbucket02.dataDirs =
>> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>>
>>
>> #################### CHANNELS ##############################
>>
>>
>>
>>
>>
>>
>> ################## CHANNELS CAPACITY ############################
>>
>>
>> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
>> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
>> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
>> agent1tier1.channels.channelbucket01.capacity = 10000000
>>
>> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
>> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
>> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
>> agent1tier1.channels.channelbucket02.capacity = 10000000
>>
>>
>> ################## END OF CHANNELS CAPACITY ############################
>>
>>
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
>> agent1tier1.sinks.avro-forward-ch01.type = avro
>> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
>> agent1tier1.sinks.avro-forward-ch01.port = 10000
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
>> agent1tier1.sinks.avro-forward-ch01backup.type = avro
>> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
>> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
>> agent1tier1.sinks.avro-forward-ch02.type = avro
>> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
>> agent1tier1.sinks.avro-forward-ch02.port = 19999
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
>> agent1tier1.sinks.avro-forward-ch02backup.type = avro
>> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
>> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>>
>>
>>
>> agent1tier1.sinkgroups = grpch1
>> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
>> avro-forward-ch01backup
>> agent1tier1.sinkgroups.grpch1.processor.type = failover
>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
>> = 10
>> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>>
>>
>>
>> agent1tier1.sinkgroups = grpch2
>> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
>> avro-forward-ch02backup
>> agent1tier1.sinkgroups.grpch2.processor.type = failover
>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
>> = 11
>> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>>
>>
>>
>> *Tier 2:*
>>
>> tier2.sources  = avro-AppSrv-source
>> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
>> tier2.channels = channelconv channelimp channelclk channelrt
>> channelhdfsrt channelhdfsdel
>> tier2.channels.channelimp.type = file
>> tier2.channels.channelconv.type = file
>> tier2.channels.channelclk.type = file
>> tier2.channels.channelrt.type = file
>> tier2.channels.channelhdfsrt.type = file
>> tier2.channels.channelhdfsdel.type = file
>>
>> # For each source, channel, and sink, set
>> # standard properties.
>> # properties of avro-AppSrv-source
>> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
>> channelclk channelrt channelhdfsrt channelhdfsdel
>> tier2.sources.avro-AppSrv-source.type = avro
>> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
>> tier2.sources.avro-AppSrv-source.port = 10000
>>
>>
>>
>>
>>
>>
>> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
>> tier2.sources.avro-AppSrv-source.selector.header = rectype
>> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
>> channelhdfsdel
>> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
>> channelhdfsdel
>> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
>> channelhdfsdel
>>
>> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
>> channelhdfsrt
>>
>>
>> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>>
>>
>>
>> tier2.sinks.impsink.type = file_roll
>> tier2.sinks.impsink.channel = channelimp
>> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
>> tier2.sinks.impsink.sink.rollInterval=60
>>
>> tier2.sinks.convsink.type = file_roll
>> tier2.sinks.convsink.channel = channelconv
>> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
>> tier2.sinks.convsink.sink.rollInterval=60
>>
>> tier2.sinks.clksink.type = file_roll
>> tier2.sinks.clksink.channel = channelclk
>> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
>> tier2.sinks.clksink.sink.rollInterval=60
>>
>>
>> tier2.sinks.rtsink.type = file_roll
>> tier2.sinks.rtsink.channel = channelrt
>> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
>> tier2.sinks.rtsink.sink.rollInterval=60
>>
>>
>> #################### CHANNELS ##############################
>>
>> tier2.channels.channelimp.checkpointDir =
>> /home/flume/channelimp/file-channel/checkpoint
>> tier2.channels.channelimp.dataDirs =
>> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>>
>>
>> tier2.channels.channelclk.checkpointDir =
>> /home/flume/channelclk/file-channel/checkpoint
>> tier2.channels.channelclk.dataDirs =
>> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>>
>> tier2.channels.channelconv.checkpointDir =
>> /home/flume/channelconv/file-channel/checkpoint
>> tier2.channels.channelconv.dataDirs =
>> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>>
>> tier2.channels.channelrt.checkpointDir =
>> /home/flume/channelrt/file-channel/checkpoint
>> tier2.channels.channelrt.dataDirs =
>> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>>
>> tier2.channels.channelhdfsrt.checkpointDir =
>> /home/flume/channelhdfsrt/file-channel/checkpoint
>> tier2.channels.channelhdfsrt.dataDirs =
>> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>>
>> tier2.channels.channelhdfsdel.checkpointDir =
>> /home/flume/channelhdfsdel/file-channel/checkpoint
>> tier2.channels.channelhdfsdel.dataDirs =
>> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>>
>>
>>
>>
>> #################### CHANNELS ##############################
>>
>>
>> tier2.sinks.hdfssinkrt.type = hdfs
>> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
>> tier2.sinks.hdfssinkrt.hdfs.path =
>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
>> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
>> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
>> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
>> # Roll based on the block size only
>> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
>> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
>> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
>> # seconds to wait before closing the file.
>> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
>> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
>> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
>> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
>> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
>> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>>
>>
>> tier2.sinks.hdfssinkdel.type = hdfs
>> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
>> tier2.sinks.hdfssinkdel.hdfs.path =
>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
>> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
>> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
>> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
>> # Roll based on the block size only
>> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
>> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
>> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
>> # seconds to wait before closing the file.
>> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
>> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
>> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
>> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
>> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
>> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
>> #################### END OF SINKS ##############################
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Re: issue with failover sinks in flume

Posted by Hari Shreedharan <hs...@cloudera.com>.
Can you send your latest config?


Thanks,
Hari

On Tue, Sep 16, 2014 at 6:01 AM, Mohit Durgapal <du...@gmail.com>
wrote:

> We have a two stage topology in flume in which we are in the first tier
> adding headers based on hash value of a field in the event.
> The hashing logic is added in the interceptor in Tier 1 of flume topology
> which basically sets a header field. And then we use multiplexing to direct
> events to Tier 2  based on that header field through selector.
>  In the second tier we are storing the events locally using file_roll and
> storing the same events in hdfs also.
> Everything works fine when we are not using the failover sinks. When we add
> the failover sink configuration in the first tier our hashing logic gets
> overriden. That means even when all the machines in our Tier 2  are active
> and running, some events which were meant for flume agent1(based on hashing
> & multiplexing) go to agent 2.
> Also we are performing this test on three machines. One machine for Tier 1(
> lets say machine A) and two machines(lets say machine B & C) for Tier 2. In
> Tier 2 for flume agent on machine B, the machine C acts as the failover
> backup and for flume agent on machine C, the machine B acts as the failover
> backup.
> Any idea what could be wrong with this configuration?
> Below are the tier wise configurations:
> *Tier 1:*
> agent1tier1.sources = tcpsrc
> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
> avro-forward-ch02 avro-forward-ch02backup
> agent1tier1.channels = channelbucket01 channelbucket02
> agent1tier1.channels.channelbucket01.type = file
> agent1tier1.channels.channelbucket02.type = file
> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
> agent1tier1.sources.tcpsrc.type = syslogtcp
> agent1tier1.sources.tcpsrc.port = 5149
> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
> agent1tier1.sources.tcpsrc.interceptors=i1
> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
> #################### INTERCEPTOR ##############################
> agent1tier1.sources.tcpsrc.interceptors=logsintercept
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
> bucket01,bucket02
> #################### END OF INTERCEPTOR ##############################
> ####################### SELECTOR ###########################
> agent1tier1.sources.tcpsrc.selector.type=multiplexing
> agent1tier1.sources.tcpsrc.selector.header = bucket
> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
> ##################### END OF SELECTOR #############################
> #################### CHANNELS ##############################
> agent1tier1.channels.channelbucket01.checkpointDir =
> /home/flume/channelbucket01/file-channel/checkpoint
> agent1tier1.channels.channelbucket01.dataDirs =
> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
> agent1tier1.channels.channelbucket02.checkpointDir =
> /home/flume/channelbucket02/file-channel/checkpoint
> agent1tier1.channels.channelbucket02.dataDirs =
> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
> #################### CHANNELS ##############################
> ################## CHANNELS CAPACITY ############################
> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket01.capacity = 10000000
> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket02.capacity = 10000000
> ################## END OF CHANNELS CAPACITY ############################
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01.type = avro
> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch01.port = 10000
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01backup.type = avro
> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02.type = avro
> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch02.port = 19999
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02backup.type = avro
> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
> agent1tier1.sinkgroups = grpch1
> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
> avro-forward-ch01backup
> agent1tier1.sinkgroups.grpch1.processor.type = failover
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup =
> 10
> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
> agent1tier1.sinkgroups = grpch2
> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
> avro-forward-ch02backup
> agent1tier1.sinkgroups.grpch2.processor.type = failover
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup =
> 11
> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
> *Tier 2:*
> tier2.sources  = avro-AppSrv-source
> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
> tier2.channels = channelconv channelimp channelclk channelrt channelhdfsrt
> channelhdfsdel
> tier2.channels.channelimp.type = file
> tier2.channels.channelconv.type = file
> tier2.channels.channelclk.type = file
> tier2.channels.channelrt.type = file
> tier2.channels.channelhdfsrt.type = file
> tier2.channels.channelhdfsdel.type = file
> # For each source, channel, and sink, set
> # standard properties.
> # properties of avro-AppSrv-source
> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
> channelclk channelrt channelhdfsrt channelhdfsdel
> tier2.sources.avro-AppSrv-source.type = avro
> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
> tier2.sources.avro-AppSrv-source.port = 10000
> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
> tier2.sources.avro-AppSrv-source.selector.header = rectype
> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
> channelhdfsdel
> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
> channelhdfsdel
> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
> channelhdfsdel
> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
> channelhdfsrt
> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
> tier2.sinks.impsink.type = file_roll
> tier2.sinks.impsink.channel = channelimp
> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
> tier2.sinks.impsink.sink.rollInterval=60
> tier2.sinks.convsink.type = file_roll
> tier2.sinks.convsink.channel = channelconv
> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
> tier2.sinks.convsink.sink.rollInterval=60
> tier2.sinks.clksink.type = file_roll
> tier2.sinks.clksink.channel = channelclk
> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
> tier2.sinks.clksink.sink.rollInterval=60
> tier2.sinks.rtsink.type = file_roll
> tier2.sinks.rtsink.channel = channelrt
> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
> tier2.sinks.rtsink.sink.rollInterval=60
> #################### CHANNELS ##############################
> tier2.channels.channelimp.checkpointDir =
> /home/flume/channelimp/file-channel/checkpoint
> tier2.channels.channelimp.dataDirs =
> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
> tier2.channels.channelclk.checkpointDir =
> /home/flume/channelclk/file-channel/checkpoint
> tier2.channels.channelclk.dataDirs =
> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
> tier2.channels.channelconv.checkpointDir =
> /home/flume/channelconv/file-channel/checkpoint
> tier2.channels.channelconv.dataDirs =
> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
> tier2.channels.channelrt.checkpointDir =
> /home/flume/channelrt/file-channel/checkpoint
> tier2.channels.channelrt.dataDirs =
> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
> tier2.channels.channelhdfsrt.checkpointDir =
> /home/flume/channelhdfsrt/file-channel/checkpoint
> tier2.channels.channelhdfsrt.dataDirs =
> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
> tier2.channels.channelhdfsdel.checkpointDir =
> /home/flume/channelhdfsdel/file-channel/checkpoint
> tier2.channels.channelhdfsdel.dataDirs =
> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
> #################### CHANNELS ##############################
> tier2.sinks.hdfssinkrt.type = hdfs
> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
> tier2.sinks.hdfssinkrt.hdfs.path =
> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
> # Roll based on the block size only
> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
> # seconds to wait before closing the file.
> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
> tier2.sinks.hdfssinkdel.type = hdfs
> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
> tier2.sinks.hdfssinkdel.hdfs.path =
> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
> # Roll based on the block size only
> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
> # seconds to wait before closing the file.
> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
> #################### END OF SINKS ##############################

Re: issue with failover sinks in flume

Posted by Mohit Durgapal <du...@gmail.com>.
Has anyone ever faced similar problems with sink failover?

On Wed, Sep 17, 2014 at 11:25 AM, Mohit Durgapal <du...@gmail.com>
wrote:

> Hi Hari,
>
> Even after inverting the priorities the same problem occurs. Are the sink
> priorities specific to a sink group or is it can be defined just once for
> the whole agent? If in an agent I use one sink in two sink groups and set
> different priorities, will the sink's priority in the last group override
> it's priorities in earlier groups?
>
> Regards
> Mohit
>
> On Tue, Sep 16, 2014 at 11:45 PM, Hari Shreedharan <
> hshreedharan@cloudera.com> wrote:
>
>> Looking at your sinks, it looks like backup of one channel pushes to the
>> primary of the other. Your sink group priorities are inverted. The higher
>> the value of the priority, earlier that sink will get picked up for
>> processing. So sink with priority 11 gets picked up before sink with
>> priority 1.
>>
>>
>>
>> On Tue, Sep 16, 2014 at 5:55 AM, Mohit Durgapal <du...@gmail.com>
>> wrote:
>>
>>> We have a two stage topology in flume in which we are in the first tier
>>> adding headers based on hash value of a field in the event.
>>> The hashing logic is added in the interceptor in Tier 1 of flume
>>> topology which basically sets a header field. And then we use multiplexing
>>> to direct events to Tier 2  based on that header field through selector.
>>>  In the second tier we are storing the events locally using file_roll
>>> and storing the same events in hdfs also.
>>>
>>> Everything works fine when we are not using the failover sinks. When we
>>> add the failover sink configuration in the first tier our hashing logic
>>> gets overriden. That means even when all the machines in our Tier 2  are
>>> active and running, some events which were meant for flume agent1(based on
>>> hashing & multiplexing) go to agent 2.
>>>
>>> Also we are performing this test on three machines. One machine for Tier
>>> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
>>> In Tier 2 for flume agent on machine B, the machine C acts as the failover
>>> backup and for flume agent on machine C, the machine B acts as the failover
>>> backup.
>>>
>>> Any idea what could be wrong with this configuration?
>>>
>>> Below are the tier wise configurations:
>>>
>>> *Tier 1:*
>>>
>>> agent1tier1.sources = tcpsrc
>>> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
>>> avro-forward-ch02 avro-forward-ch02backup
>>>
>>> agent1tier1.channels = channelbucket01 channelbucket02
>>> agent1tier1.channels.channelbucket01.type = file
>>> agent1tier1.channels.channelbucket02.type = file
>>>
>>>
>>> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
>>> agent1tier1.sources.tcpsrc.type = syslogtcp
>>> agent1tier1.sources.tcpsrc.port = 5149
>>> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
>>> agent1tier1.sources.tcpsrc.interceptors=i1
>>> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>>>
>>>
>>>
>>>
>>> #################### INTERCEPTOR ##############################
>>> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>>>
>>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
>>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
>>> bucket01,bucket02
>>>
>>>
>>>
>>> #################### END OF INTERCEPTOR ##############################
>>>
>>>
>>>
>>> ####################### SELECTOR ###########################
>>>
>>> agent1tier1.sources.tcpsrc.selector.type=multiplexing
>>> agent1tier1.sources.tcpsrc.selector.header = bucket
>>> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
>>> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
>>> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>>>
>>> ##################### END OF SELECTOR #############################
>>>
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>> agent1tier1.channels.channelbucket01.checkpointDir =
>>> /home/flume/channelbucket01/file-channel/checkpoint
>>> agent1tier1.channels.channelbucket01.dataDirs =
>>> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>>>
>>> agent1tier1.channels.channelbucket02.checkpointDir =
>>> /home/flume/channelbucket02/file-channel/checkpoint
>>> agent1tier1.channels.channelbucket02.dataDirs =
>>> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>>
>>>
>>>
>>>
>>>
>>> ################## CHANNELS CAPACITY ############################
>>>
>>>
>>> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
>>> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
>>> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
>>> agent1tier1.channels.channelbucket01.capacity = 10000000
>>>
>>> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
>>> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
>>> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
>>> agent1tier1.channels.channelbucket02.capacity = 10000000
>>>
>>>
>>> ################## END OF CHANNELS CAPACITY ############################
>>>
>>>
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
>>> agent1tier1.sinks.avro-forward-ch01.type = avro
>>> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
>>> agent1tier1.sinks.avro-forward-ch01.port = 10000
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
>>> agent1tier1.sinks.avro-forward-ch01backup.type = avro
>>> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
>>> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
>>> agent1tier1.sinks.avro-forward-ch02.type = avro
>>> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
>>> agent1tier1.sinks.avro-forward-ch02.port = 19999
>>>
>>>  # avro sink properties
>>> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
>>> agent1tier1.sinks.avro-forward-ch02backup.type = avro
>>> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
>>> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>>>
>>>
>>>
>>> agent1tier1.sinkgroups = grpch1
>>> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
>>> avro-forward-ch01backup
>>> agent1tier1.sinkgroups.grpch1.processor.type = failover
>>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
>>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
>>> = 10
>>> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>>>
>>>
>>>
>>> agent1tier1.sinkgroups = grpch2
>>> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
>>> avro-forward-ch02backup
>>> agent1tier1.sinkgroups.grpch2.processor.type = failover
>>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
>>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
>>> = 11
>>> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>>>
>>>
>>>
>>> *Tier 2:*
>>>
>>> tier2.sources  = avro-AppSrv-source
>>> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
>>> tier2.channels = channelconv channelimp channelclk channelrt
>>> channelhdfsrt channelhdfsdel
>>> tier2.channels.channelimp.type = file
>>> tier2.channels.channelconv.type = file
>>> tier2.channels.channelclk.type = file
>>> tier2.channels.channelrt.type = file
>>> tier2.channels.channelhdfsrt.type = file
>>> tier2.channels.channelhdfsdel.type = file
>>>
>>> # For each source, channel, and sink, set
>>> # standard properties.
>>> # properties of avro-AppSrv-source
>>> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
>>> channelclk channelrt channelhdfsrt channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.type = avro
>>> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
>>> tier2.sources.avro-AppSrv-source.port = 10000
>>>
>>>
>>>
>>>
>>>
>>>
>>> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
>>> tier2.sources.avro-AppSrv-source.selector.header = rectype
>>> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
>>> channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
>>> channelhdfsdel
>>> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
>>> channelhdfsdel
>>>
>>> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
>>> channelhdfsrt
>>>
>>>
>>> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>>>
>>>
>>>
>>> tier2.sinks.impsink.type = file_roll
>>> tier2.sinks.impsink.channel = channelimp
>>> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
>>> tier2.sinks.impsink.sink.rollInterval=60
>>>
>>> tier2.sinks.convsink.type = file_roll
>>> tier2.sinks.convsink.channel = channelconv
>>> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
>>> tier2.sinks.convsink.sink.rollInterval=60
>>>
>>> tier2.sinks.clksink.type = file_roll
>>> tier2.sinks.clksink.channel = channelclk
>>> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
>>> tier2.sinks.clksink.sink.rollInterval=60
>>>
>>>
>>> tier2.sinks.rtsink.type = file_roll
>>> tier2.sinks.rtsink.channel = channelrt
>>> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
>>> tier2.sinks.rtsink.sink.rollInterval=60
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>> tier2.channels.channelimp.checkpointDir =
>>> /home/flume/channelimp/file-channel/checkpoint
>>> tier2.channels.channelimp.dataDirs =
>>> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>>>
>>>
>>> tier2.channels.channelclk.checkpointDir =
>>> /home/flume/channelclk/file-channel/checkpoint
>>> tier2.channels.channelclk.dataDirs =
>>> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>>>
>>> tier2.channels.channelconv.checkpointDir =
>>> /home/flume/channelconv/file-channel/checkpoint
>>> tier2.channels.channelconv.dataDirs =
>>> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>>>
>>> tier2.channels.channelrt.checkpointDir =
>>> /home/flume/channelrt/file-channel/checkpoint
>>> tier2.channels.channelrt.dataDirs =
>>> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>>>
>>> tier2.channels.channelhdfsrt.checkpointDir =
>>> /home/flume/channelhdfsrt/file-channel/checkpoint
>>> tier2.channels.channelhdfsrt.dataDirs =
>>> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>>>
>>> tier2.channels.channelhdfsdel.checkpointDir =
>>> /home/flume/channelhdfsdel/file-channel/checkpoint
>>> tier2.channels.channelhdfsdel.dataDirs =
>>> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>>>
>>>
>>>
>>>
>>> #################### CHANNELS ##############################
>>>
>>>
>>> tier2.sinks.hdfssinkrt.type = hdfs
>>> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
>>> tier2.sinks.hdfssinkrt.hdfs.path =
>>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
>>> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
>>> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
>>> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
>>> # Roll based on the block size only
>>> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
>>> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
>>> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
>>> # seconds to wait before closing the file.
>>> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
>>> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
>>> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
>>> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
>>> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
>>> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>>>
>>>
>>> tier2.sinks.hdfssinkdel.type = hdfs
>>> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
>>> tier2.sinks.hdfssinkdel.hdfs.path =
>>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
>>> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
>>> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
>>> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
>>> # Roll based on the block size only
>>> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
>>> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
>>> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
>>> # seconds to wait before closing the file.
>>> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
>>> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
>>> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
>>> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
>>> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
>>> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
>>> #################### END OF SINKS ##############################
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: issue with failover sinks in flume

Posted by Mohit Durgapal <du...@gmail.com>.
Hi Hari,

Even after inverting the priorities the same problem occurs. Are the sink
priorities specific to a sink group or is it can be defined just once for
the whole agent? If in an agent I use one sink in two sink groups and set
different priorities, will the sink's priority in the last group override
it's priorities in earlier groups?

Regards
Mohit

On Tue, Sep 16, 2014 at 11:45 PM, Hari Shreedharan <
hshreedharan@cloudera.com> wrote:

> Looking at your sinks, it looks like backup of one channel pushes to the
> primary of the other. Your sink group priorities are inverted. The higher
> the value of the priority, earlier that sink will get picked up for
> processing. So sink with priority 11 gets picked up before sink with
> priority 1.
>
>
>
> On Tue, Sep 16, 2014 at 5:55 AM, Mohit Durgapal <du...@gmail.com>
> wrote:
>
>> We have a two stage topology in flume in which we are in the first tier
>> adding headers based on hash value of a field in the event.
>> The hashing logic is added in the interceptor in Tier 1 of flume topology
>> which basically sets a header field. And then we use multiplexing to direct
>> events to Tier 2  based on that header field through selector.
>>  In the second tier we are storing the events locally using file_roll and
>> storing the same events in hdfs also.
>>
>> Everything works fine when we are not using the failover sinks. When we
>> add the failover sink configuration in the first tier our hashing logic
>> gets overriden. That means even when all the machines in our Tier 2  are
>> active and running, some events which were meant for flume agent1(based on
>> hashing & multiplexing) go to agent 2.
>>
>> Also we are performing this test on three machines. One machine for Tier
>> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
>> In Tier 2 for flume agent on machine B, the machine C acts as the failover
>> backup and for flume agent on machine C, the machine B acts as the failover
>> backup.
>>
>> Any idea what could be wrong with this configuration?
>>
>> Below are the tier wise configurations:
>>
>> *Tier 1:*
>>
>> agent1tier1.sources = tcpsrc
>> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
>> avro-forward-ch02 avro-forward-ch02backup
>>
>> agent1tier1.channels = channelbucket01 channelbucket02
>> agent1tier1.channels.channelbucket01.type = file
>> agent1tier1.channels.channelbucket02.type = file
>>
>>
>> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
>> agent1tier1.sources.tcpsrc.type = syslogtcp
>> agent1tier1.sources.tcpsrc.port = 5149
>> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
>> agent1tier1.sources.tcpsrc.interceptors=i1
>> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>>
>>
>>
>>
>> #################### INTERCEPTOR ##############################
>> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>>
>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
>> bucket01,bucket02
>>
>>
>>
>> #################### END OF INTERCEPTOR ##############################
>>
>>
>>
>> ####################### SELECTOR ###########################
>>
>> agent1tier1.sources.tcpsrc.selector.type=multiplexing
>> agent1tier1.sources.tcpsrc.selector.header = bucket
>> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
>> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
>> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>>
>> ##################### END OF SELECTOR #############################
>>
>>
>>
>> #################### CHANNELS ##############################
>>
>> agent1tier1.channels.channelbucket01.checkpointDir =
>> /home/flume/channelbucket01/file-channel/checkpoint
>> agent1tier1.channels.channelbucket01.dataDirs =
>> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>>
>> agent1tier1.channels.channelbucket02.checkpointDir =
>> /home/flume/channelbucket02/file-channel/checkpoint
>> agent1tier1.channels.channelbucket02.dataDirs =
>> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>>
>>
>> #################### CHANNELS ##############################
>>
>>
>>
>>
>>
>>
>> ################## CHANNELS CAPACITY ############################
>>
>>
>> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
>> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
>> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
>> agent1tier1.channels.channelbucket01.capacity = 10000000
>>
>> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
>> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
>> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
>> agent1tier1.channels.channelbucket02.capacity = 10000000
>>
>>
>> ################## END OF CHANNELS CAPACITY ############################
>>
>>
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
>> agent1tier1.sinks.avro-forward-ch01.type = avro
>> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
>> agent1tier1.sinks.avro-forward-ch01.port = 10000
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
>> agent1tier1.sinks.avro-forward-ch01backup.type = avro
>> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
>> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
>> agent1tier1.sinks.avro-forward-ch02.type = avro
>> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
>> agent1tier1.sinks.avro-forward-ch02.port = 19999
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
>> agent1tier1.sinks.avro-forward-ch02backup.type = avro
>> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
>> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>>
>>
>>
>> agent1tier1.sinkgroups = grpch1
>> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
>> avro-forward-ch01backup
>> agent1tier1.sinkgroups.grpch1.processor.type = failover
>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
>> = 10
>> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>>
>>
>>
>> agent1tier1.sinkgroups = grpch2
>> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
>> avro-forward-ch02backup
>> agent1tier1.sinkgroups.grpch2.processor.type = failover
>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
>> = 11
>> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>>
>>
>>
>> *Tier 2:*
>>
>> tier2.sources  = avro-AppSrv-source
>> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
>> tier2.channels = channelconv channelimp channelclk channelrt
>> channelhdfsrt channelhdfsdel
>> tier2.channels.channelimp.type = file
>> tier2.channels.channelconv.type = file
>> tier2.channels.channelclk.type = file
>> tier2.channels.channelrt.type = file
>> tier2.channels.channelhdfsrt.type = file
>> tier2.channels.channelhdfsdel.type = file
>>
>> # For each source, channel, and sink, set
>> # standard properties.
>> # properties of avro-AppSrv-source
>> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
>> channelclk channelrt channelhdfsrt channelhdfsdel
>> tier2.sources.avro-AppSrv-source.type = avro
>> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
>> tier2.sources.avro-AppSrv-source.port = 10000
>>
>>
>>
>>
>>
>>
>> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
>> tier2.sources.avro-AppSrv-source.selector.header = rectype
>> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
>> channelhdfsdel
>> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
>> channelhdfsdel
>> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
>> channelhdfsdel
>>
>> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
>> channelhdfsrt
>>
>>
>> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>>
>>
>>
>> tier2.sinks.impsink.type = file_roll
>> tier2.sinks.impsink.channel = channelimp
>> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
>> tier2.sinks.impsink.sink.rollInterval=60
>>
>> tier2.sinks.convsink.type = file_roll
>> tier2.sinks.convsink.channel = channelconv
>> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
>> tier2.sinks.convsink.sink.rollInterval=60
>>
>> tier2.sinks.clksink.type = file_roll
>> tier2.sinks.clksink.channel = channelclk
>> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
>> tier2.sinks.clksink.sink.rollInterval=60
>>
>>
>> tier2.sinks.rtsink.type = file_roll
>> tier2.sinks.rtsink.channel = channelrt
>> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
>> tier2.sinks.rtsink.sink.rollInterval=60
>>
>>
>> #################### CHANNELS ##############################
>>
>> tier2.channels.channelimp.checkpointDir =
>> /home/flume/channelimp/file-channel/checkpoint
>> tier2.channels.channelimp.dataDirs =
>> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>>
>>
>> tier2.channels.channelclk.checkpointDir =
>> /home/flume/channelclk/file-channel/checkpoint
>> tier2.channels.channelclk.dataDirs =
>> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>>
>> tier2.channels.channelconv.checkpointDir =
>> /home/flume/channelconv/file-channel/checkpoint
>> tier2.channels.channelconv.dataDirs =
>> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>>
>> tier2.channels.channelrt.checkpointDir =
>> /home/flume/channelrt/file-channel/checkpoint
>> tier2.channels.channelrt.dataDirs =
>> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>>
>> tier2.channels.channelhdfsrt.checkpointDir =
>> /home/flume/channelhdfsrt/file-channel/checkpoint
>> tier2.channels.channelhdfsrt.dataDirs =
>> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>>
>> tier2.channels.channelhdfsdel.checkpointDir =
>> /home/flume/channelhdfsdel/file-channel/checkpoint
>> tier2.channels.channelhdfsdel.dataDirs =
>> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>>
>>
>>
>>
>> #################### CHANNELS ##############################
>>
>>
>> tier2.sinks.hdfssinkrt.type = hdfs
>> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
>> tier2.sinks.hdfssinkrt.hdfs.path =
>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
>> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
>> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
>> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
>> # Roll based on the block size only
>> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
>> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
>> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
>> # seconds to wait before closing the file.
>> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
>> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
>> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
>> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
>> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
>> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>>
>>
>> tier2.sinks.hdfssinkdel.type = hdfs
>> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
>> tier2.sinks.hdfssinkdel.hdfs.path =
>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
>> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
>> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
>> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
>> # Roll based on the block size only
>> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
>> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
>> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
>> # seconds to wait before closing the file.
>> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
>> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
>> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
>> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
>> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
>> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
>> #################### END OF SINKS ##############################
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Re: issue with failover sinks in flume

Posted by Hari Shreedharan <hs...@cloudera.com>.
Looking at your sinks, it looks like backup of one channel pushes to the
primary of the other. Your sink group priorities are inverted. The higher
the value of the priority, earlier that sink will get picked up for
processing. So sink with priority 11 gets picked up before sink with
priority 1.



On Tue, Sep 16, 2014 at 5:55 AM, Mohit Durgapal <du...@gmail.com>
wrote:

> We have a two stage topology in flume in which we are in the first tier
> adding headers based on hash value of a field in the event.
> The hashing logic is added in the interceptor in Tier 1 of flume topology
> which basically sets a header field. And then we use multiplexing to direct
> events to Tier 2  based on that header field through selector.
>  In the second tier we are storing the events locally using file_roll and
> storing the same events in hdfs also.
>
> Everything works fine when we are not using the failover sinks. When we
> add the failover sink configuration in the first tier our hashing logic
> gets overriden. That means even when all the machines in our Tier 2  are
> active and running, some events which were meant for flume agent1(based on
> hashing & multiplexing) go to agent 2.
>
> Also we are performing this test on three machines. One machine for Tier
> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
> In Tier 2 for flume agent on machine B, the machine C acts as the failover
> backup and for flume agent on machine C, the machine B acts as the failover
> backup.
>
> Any idea what could be wrong with this configuration?
>
> Below are the tier wise configurations:
>
> *Tier 1:*
>
> agent1tier1.sources = tcpsrc
> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
> avro-forward-ch02 avro-forward-ch02backup
>
> agent1tier1.channels = channelbucket01 channelbucket02
> agent1tier1.channels.channelbucket01.type = file
> agent1tier1.channels.channelbucket02.type = file
>
>
> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
> agent1tier1.sources.tcpsrc.type = syslogtcp
> agent1tier1.sources.tcpsrc.port = 5149
> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
> agent1tier1.sources.tcpsrc.interceptors=i1
> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>
>
>
>
> #################### INTERCEPTOR ##############################
> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
> bucket01,bucket02
>
>
>
> #################### END OF INTERCEPTOR ##############################
>
>
>
> ####################### SELECTOR ###########################
>
> agent1tier1.sources.tcpsrc.selector.type=multiplexing
> agent1tier1.sources.tcpsrc.selector.header = bucket
> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>
> ##################### END OF SELECTOR #############################
>
>
>
> #################### CHANNELS ##############################
>
> agent1tier1.channels.channelbucket01.checkpointDir =
> /home/flume/channelbucket01/file-channel/checkpoint
> agent1tier1.channels.channelbucket01.dataDirs =
> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>
> agent1tier1.channels.channelbucket02.checkpointDir =
> /home/flume/channelbucket02/file-channel/checkpoint
> agent1tier1.channels.channelbucket02.dataDirs =
> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>
>
> #################### CHANNELS ##############################
>
>
>
>
>
>
> ################## CHANNELS CAPACITY ############################
>
>
> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket01.capacity = 10000000
>
> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket02.capacity = 10000000
>
>
> ################## END OF CHANNELS CAPACITY ############################
>
>
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01.type = avro
> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch01.port = 10000
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01backup.type = avro
> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02.type = avro
> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch02.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02backup.type = avro
> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>
>
>
> agent1tier1.sinkgroups = grpch1
> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
> avro-forward-ch01backup
> agent1tier1.sinkgroups.grpch1.processor.type = failover
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
> = 10
> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>
>
>
> agent1tier1.sinkgroups = grpch2
> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
> avro-forward-ch02backup
> agent1tier1.sinkgroups.grpch2.processor.type = failover
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
> = 11
> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>
>
>
> *Tier 2:*
>
> tier2.sources  = avro-AppSrv-source
> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
> tier2.channels = channelconv channelimp channelclk channelrt channelhdfsrt
> channelhdfsdel
> tier2.channels.channelimp.type = file
> tier2.channels.channelconv.type = file
> tier2.channels.channelclk.type = file
> tier2.channels.channelrt.type = file
> tier2.channels.channelhdfsrt.type = file
> tier2.channels.channelhdfsdel.type = file
>
> # For each source, channel, and sink, set
> # standard properties.
> # properties of avro-AppSrv-source
> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
> channelclk channelrt channelhdfsrt channelhdfsdel
> tier2.sources.avro-AppSrv-source.type = avro
> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
> tier2.sources.avro-AppSrv-source.port = 10000
>
>
>
>
>
>
> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
> tier2.sources.avro-AppSrv-source.selector.header = rectype
> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
> channelhdfsdel
> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
> channelhdfsdel
> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
> channelhdfsdel
>
> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
> channelhdfsrt
>
>
> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>
>
>
> tier2.sinks.impsink.type = file_roll
> tier2.sinks.impsink.channel = channelimp
> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
> tier2.sinks.impsink.sink.rollInterval=60
>
> tier2.sinks.convsink.type = file_roll
> tier2.sinks.convsink.channel = channelconv
> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
> tier2.sinks.convsink.sink.rollInterval=60
>
> tier2.sinks.clksink.type = file_roll
> tier2.sinks.clksink.channel = channelclk
> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
> tier2.sinks.clksink.sink.rollInterval=60
>
>
> tier2.sinks.rtsink.type = file_roll
> tier2.sinks.rtsink.channel = channelrt
> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
> tier2.sinks.rtsink.sink.rollInterval=60
>
>
> #################### CHANNELS ##############################
>
> tier2.channels.channelimp.checkpointDir =
> /home/flume/channelimp/file-channel/checkpoint
> tier2.channels.channelimp.dataDirs =
> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>
>
> tier2.channels.channelclk.checkpointDir =
> /home/flume/channelclk/file-channel/checkpoint
> tier2.channels.channelclk.dataDirs =
> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>
> tier2.channels.channelconv.checkpointDir =
> /home/flume/channelconv/file-channel/checkpoint
> tier2.channels.channelconv.dataDirs =
> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>
> tier2.channels.channelrt.checkpointDir =
> /home/flume/channelrt/file-channel/checkpoint
> tier2.channels.channelrt.dataDirs =
> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>
> tier2.channels.channelhdfsrt.checkpointDir =
> /home/flume/channelhdfsrt/file-channel/checkpoint
> tier2.channels.channelhdfsrt.dataDirs =
> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>
> tier2.channels.channelhdfsdel.checkpointDir =
> /home/flume/channelhdfsdel/file-channel/checkpoint
> tier2.channels.channelhdfsdel.dataDirs =
> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>
>
>
>
> #################### CHANNELS ##############################
>
>
> tier2.sinks.hdfssinkrt.type = hdfs
> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
> tier2.sinks.hdfssinkrt.hdfs.path =
> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
> # Roll based on the block size only
> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
> # seconds to wait before closing the file.
> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>
>
> tier2.sinks.hdfssinkdel.type = hdfs
> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
> tier2.sinks.hdfssinkdel.hdfs.path =
> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
> # Roll based on the block size only
> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
> # seconds to wait before closing the file.
> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
> #################### END OF SINKS ##############################
>
>
>
>
>
>
>
>
>
>