You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by David Quigley <dq...@gmail.com> on 2013/04/22 17:11:55 UTC
Fwd: HBase Sink Reliability
Hi,
I am using flume to write events from webserver to both HDFS and HBase. All
events are being written to HDFS but only about half are making it into
HBase. Is there anything in my configurations which would be causing the
issue? I have both HDFS and HBase sink reading from the same File Channel.
Is it better to have one channel per sink?
Thanks,
Dave
# flume config on web server
agent.sources = sourceLog
agent.sources.sourceLog.type = exec
agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out
agent.sources.sourceLog.batchSize = 100
agent.sources.sourceLog.channels = fileChannel
agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent
agent.sources.sourceLog.interceptors.itime.type = timestamp
agent.sources.sourceLog.interceptors.ihost.type = host
agent.sources.sourceLog.interceptors.ihost.useIP = false
agent.sources.sourceLog.interceptors.ihost.hostHeader = host
agent.sources.sourceLog.interceptors.idatatype.type = static
agent.sources.sourceLog.interceptors.idatatype.key = data_type
agent.sources.sourceLog.interceptors.idatatype.value = clicks
agent.sources.sourceLog.interceptors.idataparent.type = static
agent.sources.sourceLog.interceptors.idataparent.key = data_parent
agent.sources.sourceLog.interceptors.idataparent.value = *
agent.channels = fileChannel
agent.channels.fileChannel.type = file
agent.channels.fileChannel.transactionCapacity = 100
agent.channels.fileChannel.checkpointDir =
/opt/flume/file-channel/checkpoint
agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data
agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2
AvroSink_backup_3
agent.sinks.AvroSink_main.type = avro
agent.sinks.AvroSink_main.channel = fileChannel
agent.sinks.AvroSink_main.hostname = *
agent.sinks.AvroSink_main.port = 35873
agent.sinks.AvroSink_main.batchSize = 100
agent.sinks.AvroSink_backup_1.type = avro
agent.sinks.AvroSink_backup_1.channel = fileChannel
agent.sinks.AvroSink_backup_1.hostname = *
agent.sinks.AvroSink_backup_1.port = 35873
agent.sinks.AvroSink_backup_1.batchSize = 100
agent.sinks.AvroSink_backup_2.type = avro
agent.sinks.AvroSink_backup_2.channel = fileChannel
agent.sinks.AvroSink_backup_2.hostname = *
agent.sinks.AvroSink_backup_2.port = 35873
agent.sinks.AvroSink_backup_2.batchSize = 100
agent.sinks.AvroSink_backup_3.type = avro
agent.sinks.AvroSink_backup_3.channel = fileChannel
agent.sinks.AvroSink_backup_3.hostname = *
agent.sinks.AvroSink_backup_3.port = 35873
agent.sinks.AvroSink_backup_3.batchSize = 100
agent.sinkgroups = failover
agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1
AvroSink_backup_2 AvroSink_backup_3
agent.sinkgroups.failover.processor.type = failover
agent.sinkgroups.failover.processor.priority.AvroSink_main = 10
agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5
agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3
agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1
agent.sinkgroups.failover.processor.maxpenalty = 10000
# flume config on hadoop cluster
collector.sources=AvroIn
collector.sources.AvroIn.type=avro
collector.sources.AvroIn.bind=0.0.0.0
collector.sources.AvroIn.port=35873
collector.sources.AvroIn.channels=fileChannel
collector.channels=fileChannel
collector.channels.fileChannel.type=FILE
collector.channels.fileChannel.capacity=1000
collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}
collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}
collector.sinks=hbaseSink hdfsSink
collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
collector.sinks.hbaseSink.channel=fileChannel
collector.sinks.hbaseSink.table=clicks
collector.sinks.hbaseSink.columnFamily=data
collector.sinks.hbaseSink.batchSize=100
collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer
collector.sinks.hbaseSink.serializer.incrementColumn=icol
collector.sinks.hdfsSink.type=hdfs
collector.sinks.hdfsSink.channel=fileChannel
collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d
collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host}
collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles
collector.sinks.hdfsSink.hdfs.fileType=DataStream
collector.sinks.hdfsSink.hdfs.writeFormat=Text
collector.sinks.hdfsSink.hdfs.rollSize=67100000
collector.sinks.hdfsSink.hdfs.rollCount=0
collector.sinks.hdfsSink.hdfs.rollInterval=3600
Re: HBase Sink Reliability
Posted by Jeff Lord <jl...@cloudera.com>.
Mike Percy contributed a most excellent blog post on this topic.
Have you had a chance to read over this?
https://blogs.apache.org/flume/entry/flume_performance_tuning_part_1
"*
Tuning the batch size trades throughput vs. latency and duplication under
failure. With a small batch size, throughput decreases, but the risk of
event duplication is reduced if a failure were to occur. With a large batch
size, you get much better throughput, but increased latency, and in the
case of a transaction failure, the number of possible duplicates increases."
*
On Wed, Apr 24, 2013 at 11:04 PM, David Quigley <dq...@gmail.com>wrote:
> Thanks all,
>
> Added a dedicated channel for hdfs and hbase and everything all events are
> making it into their sinks now.
>
> What is the best tuning strategy for getting events from an exec source ->
> avro sink -> avro source -> hbase sink with the least amount of latency?
> Will batch size and transaction size have any effect on this latency?
>
> Thanks again
>
>
> On Mon, Apr 22, 2013 at 10:58 AM, Israel Ekpo <is...@aicer.org> wrote:
>
>> David,
>>
>> In addition to what has already been said, if you take a look at your
>> flume log files, you should be able to see exception messages that explain
>> why this is happening.
>>
>>
>>
>>
>> On 22 April 2013 11:11, David Quigley <dq...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am using flume to write events from webserver to both HDFS and HBase.
>>> All events are being written to HDFS but only about half are making it into
>>> HBase. Is there anything in my configurations which would be causing the
>>> issue? I have both HDFS and HBase sink reading from the same File Channel.
>>> Is it better to have one channel per sink?
>>>
>>> Thanks,
>>> Dave
>>>
>>>
>>> # flume config on web server
>>> agent.sources = sourceLog
>>> agent.sources.sourceLog.type = exec
>>> agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out
>>> agent.sources.sourceLog.batchSize = 100
>>> agent.sources.sourceLog.channels = fileChannel
>>>
>>> agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent
>>> agent.sources.sourceLog.interceptors.itime.type = timestamp
>>> agent.sources.sourceLog.interceptors.ihost.type = host
>>> agent.sources.sourceLog.interceptors.ihost.useIP = false
>>> agent.sources.sourceLog.interceptors.ihost.hostHeader = host
>>> agent.sources.sourceLog.interceptors.idatatype.type = static
>>> agent.sources.sourceLog.interceptors.idatatype.key = data_type
>>> agent.sources.sourceLog.interceptors.idatatype.value = clicks
>>> agent.sources.sourceLog.interceptors.idataparent.type = static
>>> agent.sources.sourceLog.interceptors.idataparent.key = data_parent
>>> agent.sources.sourceLog.interceptors.idataparent.value = *
>>>
>>> agent.channels = fileChannel
>>> agent.channels.fileChannel.type = file
>>> agent.channels.fileChannel.transactionCapacity = 100
>>> agent.channels.fileChannel.checkpointDir =
>>> /opt/flume/file-channel/checkpoint
>>> agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data
>>>
>>> agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2
>>> AvroSink_backup_3
>>> agent.sinks.AvroSink_main.type = avro
>>> agent.sinks.AvroSink_main.channel = fileChannel
>>> agent.sinks.AvroSink_main.hostname = *
>>> agent.sinks.AvroSink_main.port = 35873
>>> agent.sinks.AvroSink_main.batchSize = 100
>>> agent.sinks.AvroSink_backup_1.type = avro
>>> agent.sinks.AvroSink_backup_1.channel = fileChannel
>>> agent.sinks.AvroSink_backup_1.hostname = *
>>> agent.sinks.AvroSink_backup_1.port = 35873
>>> agent.sinks.AvroSink_backup_1.batchSize = 100
>>> agent.sinks.AvroSink_backup_2.type = avro
>>> agent.sinks.AvroSink_backup_2.channel = fileChannel
>>> agent.sinks.AvroSink_backup_2.hostname = *
>>> agent.sinks.AvroSink_backup_2.port = 35873
>>> agent.sinks.AvroSink_backup_2.batchSize = 100
>>> agent.sinks.AvroSink_backup_3.type = avro
>>> agent.sinks.AvroSink_backup_3.channel = fileChannel
>>> agent.sinks.AvroSink_backup_3.hostname = *
>>> agent.sinks.AvroSink_backup_3.port = 35873
>>> agent.sinks.AvroSink_backup_3.batchSize = 100
>>> agent.sinkgroups = failover
>>> agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1
>>> AvroSink_backup_2 AvroSink_backup_3
>>> agent.sinkgroups.failover.processor.type = failover
>>> agent.sinkgroups.failover.processor.priority.AvroSink_main = 10
>>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5
>>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3
>>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1
>>> agent.sinkgroups.failover.processor.maxpenalty = 10000
>>>
>>>
>>>
>>> # flume config on hadoop cluster
>>>
>>> collector.sources=AvroIn
>>>
>>> collector.sources.AvroIn.type=avro
>>>
>>> collector.sources.AvroIn.bind=0.0.0.0
>>>
>>> collector.sources.AvroIn.port=35873
>>>
>>> collector.sources.AvroIn.channels=fileChannel
>>>
>>>
>>> collector.channels=fileChannel
>>>
>>> collector.channels.fileChannel.type=FILE
>>>
>>> collector.channels.fileChannel.capacity=1000
>>>
>>>
>>> collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}
>>>
>>>
>>> collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}
>>>
>>> collector.sinks=hbaseSink hdfsSink
>>>
>>> collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
>>>
>>> collector.sinks.hbaseSink.channel=fileChannel
>>>
>>> collector.sinks.hbaseSink.table=clicks
>>>
>>> collector.sinks.hbaseSink.columnFamily=data
>>>
>>> collector.sinks.hbaseSink.batchSize=100
>>>
>>>
>>> collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer
>>>
>>> collector.sinks.hbaseSink.serializer.incrementColumn=icol
>>>
>>>
>>> collector.sinks.hdfsSink.type=hdfs
>>>
>>> collector.sinks.hdfsSink.channel=fileChannel
>>>
>>>
>>> collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d
>>>
>>>
>>> collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host}
>>>
>>> collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles
>>>
>>> collector.sinks.hdfsSink.hdfs.fileType=DataStream
>>>
>>> collector.sinks.hdfsSink.hdfs.writeFormat=Text
>>>
>>> collector.sinks.hdfsSink.hdfs.rollSize=67100000
>>>
>>> collector.sinks.hdfsSink.hdfs.rollCount=0
>>>
>>> collector.sinks.hdfsSink.hdfs.rollInterval=3600
>>>
>>>
>>
>
Re: HBase Sink Reliability
Posted by David Quigley <dq...@gmail.com>.
Thanks all,
Added a dedicated channel for hdfs and hbase and everything all events are
making it into their sinks now.
What is the best tuning strategy for getting events from an exec source ->
avro sink -> avro source -> hbase sink with the least amount of latency?
Will batch size and transaction size have any effect on this latency?
Thanks again
On Mon, Apr 22, 2013 at 10:58 AM, Israel Ekpo <is...@aicer.org> wrote:
> David,
>
> In addition to what has already been said, if you take a look at your
> flume log files, you should be able to see exception messages that explain
> why this is happening.
>
>
>
>
> On 22 April 2013 11:11, David Quigley <dq...@gmail.com> wrote:
>
>> Hi,
>>
>> I am using flume to write events from webserver to both HDFS and HBase.
>> All events are being written to HDFS but only about half are making it into
>> HBase. Is there anything in my configurations which would be causing the
>> issue? I have both HDFS and HBase sink reading from the same File Channel.
>> Is it better to have one channel per sink?
>>
>> Thanks,
>> Dave
>>
>>
>> # flume config on web server
>> agent.sources = sourceLog
>> agent.sources.sourceLog.type = exec
>> agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out
>> agent.sources.sourceLog.batchSize = 100
>> agent.sources.sourceLog.channels = fileChannel
>>
>> agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent
>> agent.sources.sourceLog.interceptors.itime.type = timestamp
>> agent.sources.sourceLog.interceptors.ihost.type = host
>> agent.sources.sourceLog.interceptors.ihost.useIP = false
>> agent.sources.sourceLog.interceptors.ihost.hostHeader = host
>> agent.sources.sourceLog.interceptors.idatatype.type = static
>> agent.sources.sourceLog.interceptors.idatatype.key = data_type
>> agent.sources.sourceLog.interceptors.idatatype.value = clicks
>> agent.sources.sourceLog.interceptors.idataparent.type = static
>> agent.sources.sourceLog.interceptors.idataparent.key = data_parent
>> agent.sources.sourceLog.interceptors.idataparent.value = *
>>
>> agent.channels = fileChannel
>> agent.channels.fileChannel.type = file
>> agent.channels.fileChannel.transactionCapacity = 100
>> agent.channels.fileChannel.checkpointDir =
>> /opt/flume/file-channel/checkpoint
>> agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data
>>
>> agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2
>> AvroSink_backup_3
>> agent.sinks.AvroSink_main.type = avro
>> agent.sinks.AvroSink_main.channel = fileChannel
>> agent.sinks.AvroSink_main.hostname = *
>> agent.sinks.AvroSink_main.port = 35873
>> agent.sinks.AvroSink_main.batchSize = 100
>> agent.sinks.AvroSink_backup_1.type = avro
>> agent.sinks.AvroSink_backup_1.channel = fileChannel
>> agent.sinks.AvroSink_backup_1.hostname = *
>> agent.sinks.AvroSink_backup_1.port = 35873
>> agent.sinks.AvroSink_backup_1.batchSize = 100
>> agent.sinks.AvroSink_backup_2.type = avro
>> agent.sinks.AvroSink_backup_2.channel = fileChannel
>> agent.sinks.AvroSink_backup_2.hostname = *
>> agent.sinks.AvroSink_backup_2.port = 35873
>> agent.sinks.AvroSink_backup_2.batchSize = 100
>> agent.sinks.AvroSink_backup_3.type = avro
>> agent.sinks.AvroSink_backup_3.channel = fileChannel
>> agent.sinks.AvroSink_backup_3.hostname = *
>> agent.sinks.AvroSink_backup_3.port = 35873
>> agent.sinks.AvroSink_backup_3.batchSize = 100
>> agent.sinkgroups = failover
>> agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1
>> AvroSink_backup_2 AvroSink_backup_3
>> agent.sinkgroups.failover.processor.type = failover
>> agent.sinkgroups.failover.processor.priority.AvroSink_main = 10
>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5
>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3
>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1
>> agent.sinkgroups.failover.processor.maxpenalty = 10000
>>
>>
>>
>> # flume config on hadoop cluster
>>
>> collector.sources=AvroIn
>>
>> collector.sources.AvroIn.type=avro
>>
>> collector.sources.AvroIn.bind=0.0.0.0
>>
>> collector.sources.AvroIn.port=35873
>>
>> collector.sources.AvroIn.channels=fileChannel
>>
>>
>> collector.channels=fileChannel
>>
>> collector.channels.fileChannel.type=FILE
>>
>> collector.channels.fileChannel.capacity=1000
>>
>>
>> collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}
>>
>>
>> collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}
>>
>> collector.sinks=hbaseSink hdfsSink
>>
>> collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
>>
>> collector.sinks.hbaseSink.channel=fileChannel
>>
>> collector.sinks.hbaseSink.table=clicks
>>
>> collector.sinks.hbaseSink.columnFamily=data
>>
>> collector.sinks.hbaseSink.batchSize=100
>>
>> collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer
>>
>> collector.sinks.hbaseSink.serializer.incrementColumn=icol
>>
>>
>> collector.sinks.hdfsSink.type=hdfs
>>
>> collector.sinks.hdfsSink.channel=fileChannel
>>
>>
>> collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d
>>
>>
>> collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host}
>>
>> collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles
>>
>> collector.sinks.hdfsSink.hdfs.fileType=DataStream
>>
>> collector.sinks.hdfsSink.hdfs.writeFormat=Text
>>
>> collector.sinks.hdfsSink.hdfs.rollSize=67100000
>>
>> collector.sinks.hdfsSink.hdfs.rollCount=0
>>
>> collector.sinks.hdfsSink.hdfs.rollInterval=3600
>>
>>
>
Re: HBase Sink Reliability
Posted by Israel Ekpo <is...@aicer.org>.
David,
In addition to what has already been said, if you take a look at your flume
log files, you should be able to see exception messages that explain why
this is happening.
On 22 April 2013 11:11, David Quigley <dq...@gmail.com> wrote:
> Hi,
>
> I am using flume to write events from webserver to both HDFS and HBase.
> All events are being written to HDFS but only about half are making it into
> HBase. Is there anything in my configurations which would be causing the
> issue? I have both HDFS and HBase sink reading from the same File Channel.
> Is it better to have one channel per sink?
>
> Thanks,
> Dave
>
>
> # flume config on web server
> agent.sources = sourceLog
> agent.sources.sourceLog.type = exec
> agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out
> agent.sources.sourceLog.batchSize = 100
> agent.sources.sourceLog.channels = fileChannel
>
> agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent
> agent.sources.sourceLog.interceptors.itime.type = timestamp
> agent.sources.sourceLog.interceptors.ihost.type = host
> agent.sources.sourceLog.interceptors.ihost.useIP = false
> agent.sources.sourceLog.interceptors.ihost.hostHeader = host
> agent.sources.sourceLog.interceptors.idatatype.type = static
> agent.sources.sourceLog.interceptors.idatatype.key = data_type
> agent.sources.sourceLog.interceptors.idatatype.value = clicks
> agent.sources.sourceLog.interceptors.idataparent.type = static
> agent.sources.sourceLog.interceptors.idataparent.key = data_parent
> agent.sources.sourceLog.interceptors.idataparent.value = *
>
> agent.channels = fileChannel
> agent.channels.fileChannel.type = file
> agent.channels.fileChannel.transactionCapacity = 100
> agent.channels.fileChannel.checkpointDir =
> /opt/flume/file-channel/checkpoint
> agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data
>
> agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2
> AvroSink_backup_3
> agent.sinks.AvroSink_main.type = avro
> agent.sinks.AvroSink_main.channel = fileChannel
> agent.sinks.AvroSink_main.hostname = *
> agent.sinks.AvroSink_main.port = 35873
> agent.sinks.AvroSink_main.batchSize = 100
> agent.sinks.AvroSink_backup_1.type = avro
> agent.sinks.AvroSink_backup_1.channel = fileChannel
> agent.sinks.AvroSink_backup_1.hostname = *
> agent.sinks.AvroSink_backup_1.port = 35873
> agent.sinks.AvroSink_backup_1.batchSize = 100
> agent.sinks.AvroSink_backup_2.type = avro
> agent.sinks.AvroSink_backup_2.channel = fileChannel
> agent.sinks.AvroSink_backup_2.hostname = *
> agent.sinks.AvroSink_backup_2.port = 35873
> agent.sinks.AvroSink_backup_2.batchSize = 100
> agent.sinks.AvroSink_backup_3.type = avro
> agent.sinks.AvroSink_backup_3.channel = fileChannel
> agent.sinks.AvroSink_backup_3.hostname = *
> agent.sinks.AvroSink_backup_3.port = 35873
> agent.sinks.AvroSink_backup_3.batchSize = 100
> agent.sinkgroups = failover
> agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1
> AvroSink_backup_2 AvroSink_backup_3
> agent.sinkgroups.failover.processor.type = failover
> agent.sinkgroups.failover.processor.priority.AvroSink_main = 10
> agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5
> agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3
> agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1
> agent.sinkgroups.failover.processor.maxpenalty = 10000
>
>
>
> # flume config on hadoop cluster
>
> collector.sources=AvroIn
>
> collector.sources.AvroIn.type=avro
>
> collector.sources.AvroIn.bind=0.0.0.0
>
> collector.sources.AvroIn.port=35873
>
> collector.sources.AvroIn.channels=fileChannel
>
>
> collector.channels=fileChannel
>
> collector.channels.fileChannel.type=FILE
>
> collector.channels.fileChannel.capacity=1000
>
>
> collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}
>
>
> collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}
>
> collector.sinks=hbaseSink hdfsSink
>
> collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
>
> collector.sinks.hbaseSink.channel=fileChannel
>
> collector.sinks.hbaseSink.table=clicks
>
> collector.sinks.hbaseSink.columnFamily=data
>
> collector.sinks.hbaseSink.batchSize=100
>
> collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer
>
> collector.sinks.hbaseSink.serializer.incrementColumn=icol
>
>
> collector.sinks.hdfsSink.type=hdfs
>
> collector.sinks.hdfsSink.channel=fileChannel
>
>
> collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d
>
>
> collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host}
>
> collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles
>
> collector.sinks.hdfsSink.hdfs.fileType=DataStream
>
> collector.sinks.hdfsSink.hdfs.writeFormat=Text
>
> collector.sinks.hdfsSink.hdfs.rollSize=67100000
>
> collector.sinks.hdfsSink.hdfs.rollCount=0
>
> collector.sinks.hdfsSink.hdfs.rollInterval=3600
>
>
Re: HBase Sink Reliability
Posted by Brock Noland <br...@cloudera.com>.
Also, the below doesn't make sense unless you are generating the config
yourself. The file channel doesn't do the kind of path manipulation that
the hdfs sink does...
collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}
collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}
On Mon, Apr 22, 2013 at 10:54 AM, Jeff Lord <jl...@cloudera.com> wrote:
> Hi Dave,
>
> You are on the right track with thoughts here.
> The best way to ensure all events are successfully delivered to Hbase as
> well would be to use a separate channel for the hbase sink.
>
> -Jeff
>
>
> On Mon, Apr 22, 2013 at 8:11 AM, David Quigley <dq...@gmail.com>wrote:
>
>> Hi,
>>
>> I am using flume to write events from webserver to both HDFS and HBase.
>> All events are being written to HDFS but only about half are making it into
>> HBase. Is there anything in my configurations which would be causing the
>> issue? I have both HDFS and HBase sink reading from the same File Channel.
>> Is it better to have one channel per sink?
>>
>> Thanks,
>> Dave
>>
>>
>> # flume config on web server
>> agent.sources = sourceLog
>> agent.sources.sourceLog.type = exec
>> agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out
>> agent.sources.sourceLog.batchSize = 100
>> agent.sources.sourceLog.channels = fileChannel
>>
>> agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent
>> agent.sources.sourceLog.interceptors.itime.type = timestamp
>> agent.sources.sourceLog.interceptors.ihost.type = host
>> agent.sources.sourceLog.interceptors.ihost.useIP = false
>> agent.sources.sourceLog.interceptors.ihost.hostHeader = host
>> agent.sources.sourceLog.interceptors.idatatype.type = static
>> agent.sources.sourceLog.interceptors.idatatype.key = data_type
>> agent.sources.sourceLog.interceptors.idatatype.value = clicks
>> agent.sources.sourceLog.interceptors.idataparent.type = static
>> agent.sources.sourceLog.interceptors.idataparent.key = data_parent
>> agent.sources.sourceLog.interceptors.idataparent.value = *
>>
>> agent.channels = fileChannel
>> agent.channels.fileChannel.type = file
>> agent.channels.fileChannel.transactionCapacity = 100
>> agent.channels.fileChannel.checkpointDir =
>> /opt/flume/file-channel/checkpoint
>> agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data
>>
>> agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2
>> AvroSink_backup_3
>> agent.sinks.AvroSink_main.type = avro
>> agent.sinks.AvroSink_main.channel = fileChannel
>> agent.sinks.AvroSink_main.hostname = *
>> agent.sinks.AvroSink_main.port = 35873
>> agent.sinks.AvroSink_main.batchSize = 100
>> agent.sinks.AvroSink_backup_1.type = avro
>> agent.sinks.AvroSink_backup_1.channel = fileChannel
>> agent.sinks.AvroSink_backup_1.hostname = *
>> agent.sinks.AvroSink_backup_1.port = 35873
>> agent.sinks.AvroSink_backup_1.batchSize = 100
>> agent.sinks.AvroSink_backup_2.type = avro
>> agent.sinks.AvroSink_backup_2.channel = fileChannel
>> agent.sinks.AvroSink_backup_2.hostname = *
>> agent.sinks.AvroSink_backup_2.port = 35873
>> agent.sinks.AvroSink_backup_2.batchSize = 100
>> agent.sinks.AvroSink_backup_3.type = avro
>> agent.sinks.AvroSink_backup_3.channel = fileChannel
>> agent.sinks.AvroSink_backup_3.hostname = *
>> agent.sinks.AvroSink_backup_3.port = 35873
>> agent.sinks.AvroSink_backup_3.batchSize = 100
>> agent.sinkgroups = failover
>> agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1
>> AvroSink_backup_2 AvroSink_backup_3
>> agent.sinkgroups.failover.processor.type = failover
>> agent.sinkgroups.failover.processor.priority.AvroSink_main = 10
>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5
>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3
>> agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1
>> agent.sinkgroups.failover.processor.maxpenalty = 10000
>>
>>
>>
>> # flume config on hadoop cluster
>>
>> collector.sources=AvroIn
>>
>> collector.sources.AvroIn.type=avro
>>
>> collector.sources.AvroIn.bind=0.0.0.0
>>
>> collector.sources.AvroIn.port=35873
>>
>> collector.sources.AvroIn.channels=fileChannel
>>
>>
>> collector.channels=fileChannel
>>
>> collector.channels.fileChannel.type=FILE
>>
>> collector.channels.fileChannel.capacity=1000
>>
>>
>> collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}
>>
>>
>> collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}
>>
>> collector.sinks=hbaseSink hdfsSink
>>
>> collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
>>
>> collector.sinks.hbaseSink.channel=fileChannel
>>
>> collector.sinks.hbaseSink.table=clicks
>>
>> collector.sinks.hbaseSink.columnFamily=data
>>
>> collector.sinks.hbaseSink.batchSize=100
>>
>> collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer
>>
>> collector.sinks.hbaseSink.serializer.incrementColumn=icol
>>
>>
>> collector.sinks.hdfsSink.type=hdfs
>>
>> collector.sinks.hdfsSink.channel=fileChannel
>>
>>
>> collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d
>>
>>
>> collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host}
>>
>> collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles
>>
>> collector.sinks.hdfsSink.hdfs.fileType=DataStream
>>
>> collector.sinks.hdfsSink.hdfs.writeFormat=Text
>>
>> collector.sinks.hdfsSink.hdfs.rollSize=67100000
>>
>> collector.sinks.hdfsSink.hdfs.rollCount=0
>>
>> collector.sinks.hdfsSink.hdfs.rollInterval=3600
>>
>>
>
--
Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
Re: HBase Sink Reliability
Posted by Jeff Lord <jl...@cloudera.com>.
Hi Dave,
You are on the right track with thoughts here.
The best way to ensure all events are successfully delivered to Hbase as
well would be to use a separate channel for the hbase sink.
-Jeff
On Mon, Apr 22, 2013 at 8:11 AM, David Quigley <dq...@gmail.com> wrote:
> Hi,
>
> I am using flume to write events from webserver to both HDFS and HBase.
> All events are being written to HDFS but only about half are making it into
> HBase. Is there anything in my configurations which would be causing the
> issue? I have both HDFS and HBase sink reading from the same File Channel.
> Is it better to have one channel per sink?
>
> Thanks,
> Dave
>
>
> # flume config on web server
> agent.sources = sourceLog
> agent.sources.sourceLog.type = exec
> agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out
> agent.sources.sourceLog.batchSize = 100
> agent.sources.sourceLog.channels = fileChannel
>
> agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent
> agent.sources.sourceLog.interceptors.itime.type = timestamp
> agent.sources.sourceLog.interceptors.ihost.type = host
> agent.sources.sourceLog.interceptors.ihost.useIP = false
> agent.sources.sourceLog.interceptors.ihost.hostHeader = host
> agent.sources.sourceLog.interceptors.idatatype.type = static
> agent.sources.sourceLog.interceptors.idatatype.key = data_type
> agent.sources.sourceLog.interceptors.idatatype.value = clicks
> agent.sources.sourceLog.interceptors.idataparent.type = static
> agent.sources.sourceLog.interceptors.idataparent.key = data_parent
> agent.sources.sourceLog.interceptors.idataparent.value = *
>
> agent.channels = fileChannel
> agent.channels.fileChannel.type = file
> agent.channels.fileChannel.transactionCapacity = 100
> agent.channels.fileChannel.checkpointDir =
> /opt/flume/file-channel/checkpoint
> agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data
>
> agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2
> AvroSink_backup_3
> agent.sinks.AvroSink_main.type = avro
> agent.sinks.AvroSink_main.channel = fileChannel
> agent.sinks.AvroSink_main.hostname = *
> agent.sinks.AvroSink_main.port = 35873
> agent.sinks.AvroSink_main.batchSize = 100
> agent.sinks.AvroSink_backup_1.type = avro
> agent.sinks.AvroSink_backup_1.channel = fileChannel
> agent.sinks.AvroSink_backup_1.hostname = *
> agent.sinks.AvroSink_backup_1.port = 35873
> agent.sinks.AvroSink_backup_1.batchSize = 100
> agent.sinks.AvroSink_backup_2.type = avro
> agent.sinks.AvroSink_backup_2.channel = fileChannel
> agent.sinks.AvroSink_backup_2.hostname = *
> agent.sinks.AvroSink_backup_2.port = 35873
> agent.sinks.AvroSink_backup_2.batchSize = 100
> agent.sinks.AvroSink_backup_3.type = avro
> agent.sinks.AvroSink_backup_3.channel = fileChannel
> agent.sinks.AvroSink_backup_3.hostname = *
> agent.sinks.AvroSink_backup_3.port = 35873
> agent.sinks.AvroSink_backup_3.batchSize = 100
> agent.sinkgroups = failover
> agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1
> AvroSink_backup_2 AvroSink_backup_3
> agent.sinkgroups.failover.processor.type = failover
> agent.sinkgroups.failover.processor.priority.AvroSink_main = 10
> agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5
> agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3
> agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1
> agent.sinkgroups.failover.processor.maxpenalty = 10000
>
>
>
> # flume config on hadoop cluster
>
> collector.sources=AvroIn
>
> collector.sources.AvroIn.type=avro
>
> collector.sources.AvroIn.bind=0.0.0.0
>
> collector.sources.AvroIn.port=35873
>
> collector.sources.AvroIn.channels=fileChannel
>
>
> collector.channels=fileChannel
>
> collector.channels.fileChannel.type=FILE
>
> collector.channels.fileChannel.capacity=1000
>
>
> collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}
>
>
> collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}
>
> collector.sinks=hbaseSink hdfsSink
>
> collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
>
> collector.sinks.hbaseSink.channel=fileChannel
>
> collector.sinks.hbaseSink.table=clicks
>
> collector.sinks.hbaseSink.columnFamily=data
>
> collector.sinks.hbaseSink.batchSize=100
>
> collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer
>
> collector.sinks.hbaseSink.serializer.incrementColumn=icol
>
>
> collector.sinks.hdfsSink.type=hdfs
>
> collector.sinks.hdfsSink.channel=fileChannel
>
>
> collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d
>
>
> collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host}
>
> collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles
>
> collector.sinks.hdfsSink.hdfs.fileType=DataStream
>
> collector.sinks.hdfsSink.hdfs.writeFormat=Text
>
> collector.sinks.hdfsSink.hdfs.rollSize=67100000
>
> collector.sinks.hdfsSink.hdfs.rollCount=0
>
> collector.sinks.hdfsSink.hdfs.rollInterval=3600
>
>