You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Aiman Najjar <na...@gmail.com> on 2015/08/22 00:05:16 UTC

HDFS Downtime tolerance

Hello,

I'm trying to setup a flume pipeline where there is tolerance for long
periods of HDFS down time. Here's how my current configuration looks like
(notice that agent1 here receives data via netcat interface, however, in my
prod setup it receives data from an external avro):

################## FLUME.CONF BEGIN ##################

####### AGENT 1 : durable_channel_1 (will always be up) ########
durable_channel_1.sources = click-source
durable_channel_1.sinks = forward-sink
durable_channel_1.channels = file-channel

### Source Definitions
durable_channel_1.sources.click-source.type = netcat
durable_channel_1.sources.click-source.bind = agent1
durable_channel_1.sources.click-source.port = 44445
durable_channel_1.sources.click-source.channels = file-channel
durable_channel_1.sources.click-source.max-line-length = 2000

### Channel Definitions
durable_channel_1.channels.file-channel.type = file
durable_channel_1.channels.file-channel.capacity = 6000000
durable_channel_1.channels.file-channel.transactionCapacity = 10000

### Sink Definitions
durable_channel_1.sinks.forward-sink.channel = file-channel
durable_channel_1.sinks.forward-sink.type = avro
durable_channel_1.sinks.forward-sink.hostname = vm-cluster-node3
durable_channel_1.sinks.forward-sink.port = 57938


####### AGENT 2 : stream_persist (will be brought down for long periods and
then back online) ########
stream_persist.sources = durable-collection-source
stream_persist.sinks = hdfs-sink
stream_persist.channels = mem-channel

### Source Definitions
stream_persist.sources.durable-collection-source.type = avro
stream_persist.sources.durable-collection-source.bind = vm-cluster-node3
stream_persist.sources.durable-collection-source.port = 57938
stream_persist.sources.durable-collection-source.channels = mem-channel

### Channel Definitions
stream_persist.channels.mem-channel.type = memory
stream_persist.channels.mem-channel.capacity = 6000000
stream_persist.channels.mem-channel.transactionCapacity = 10000


### Sink Definitions
stream_persist.sinks.hdfs-sink.channel = mem-channel
stream_persist.sinks.hdfs-sink.type = com.mycompany.flume.MyCustomHDFSSink
stream_persist.sinks.hdfs-sink.format = hdfs

################## FLUME.CONF END ##################

hdfs-sink is a custom hdfs sink that I custom built, it persists to HDFS
after doing some realtime processing.

In the configuration above, shouldn't Agent1 resend all the backlog
accumulated while Agent 2 was down? In my case it seems that it only
persists them on disk but it does not resend the data  when it reconnects
with agent 1.

Also a separate but related question, in my custom hdfs-sink, is throwing
an exception sufficient to indicate that the event was not processed
successfully? I would like to to propagate back to agent1 that agent2 has
failed in persisting to HDFS so that it resends the data (for example, if
an HDFS write has failed, agent1 should resend that event). Currently I'm
only throwing an exception but it seems that this is not triggering a retry.

Thank you

Re: HDFS Downtime tolerance

Posted by Johny Rufus <jr...@cloudera.com>.
For your second question, when agent2's HDFS sink is throwing exception,
this wont be propagated to agent1. This wont even be propagated to agent2's
source. But the channel will start filling in and will reach max capacity
and then the agent2's source cannot take any more data so it will throw
exception to agent1.

When the above scenario happens, agent1's sink will fail and stop draining
data from the channel, agent1's channel will start filling up, until it
reaches its full capacity, after which agent1 wont accept any more data. I
think this is happening in your case, as the file channel is getting full.
When agent2 comes back, the avro sink of agent1 should start draining
events. If this is not happening, are there any exceptions in the logs ?
Can you try restarting agent1 ?

Thanks,
Rufus

On Fri, Aug 21, 2015 at 3:05 PM, Aiman Najjar <na...@gmail.com>
wrote:

> Hello,
>
> I'm trying to setup a flume pipeline where there is tolerance for long
> periods of HDFS down time. Here's how my current configuration looks like
> (notice that agent1 here receives data via netcat interface, however, in my
> prod setup it receives data from an external avro):
>
> ################## FLUME.CONF BEGIN ##################
>
> ####### AGENT 1 : durable_channel_1 (will always be up) ########
> durable_channel_1.sources = click-source
> durable_channel_1.sinks = forward-sink
> durable_channel_1.channels = file-channel
>
> ### Source Definitions
> durable_channel_1.sources.click-source.type = netcat
> durable_channel_1.sources.click-source.bind = agent1
> durable_channel_1.sources.click-source.port = 44445
> durable_channel_1.sources.click-source.channels = file-channel
> durable_channel_1.sources.click-source.max-line-length = 2000
>
> ### Channel Definitions
> durable_channel_1.channels.file-channel.type = file
> durable_channel_1.channels.file-channel.capacity = 6000000
> durable_channel_1.channels.file-channel.transactionCapacity = 10000
>
> ### Sink Definitions
> durable_channel_1.sinks.forward-sink.channel = file-channel
> durable_channel_1.sinks.forward-sink.type = avro
> durable_channel_1.sinks.forward-sink.hostname = vm-cluster-node3
> durable_channel_1.sinks.forward-sink.port = 57938
>
>
> ####### AGENT 2 : stream_persist (will be brought down for long periods
> and then back online) ########
> stream_persist.sources = durable-collection-source
> stream_persist.sinks = hdfs-sink
> stream_persist.channels = mem-channel
>
> ### Source Definitions
> stream_persist.sources.durable-collection-source.type = avro
> stream_persist.sources.durable-collection-source.bind = vm-cluster-node3
> stream_persist.sources.durable-collection-source.port = 57938
> stream_persist.sources.durable-collection-source.channels = mem-channel
>
> ### Channel Definitions
> stream_persist.channels.mem-channel.type = memory
> stream_persist.channels.mem-channel.capacity = 6000000
> stream_persist.channels.mem-channel.transactionCapacity = 10000
>
>
> ### Sink Definitions
> stream_persist.sinks.hdfs-sink.channel = mem-channel
> stream_persist.sinks.hdfs-sink.type = com.mycompany.flume.MyCustomHDFSSink
> stream_persist.sinks.hdfs-sink.format = hdfs
>
> ################## FLUME.CONF END ##################
>
> hdfs-sink is a custom hdfs sink that I custom built, it persists to HDFS
> after doing some realtime processing.
>
> In the configuration above, shouldn't Agent1 resend all the backlog
> accumulated while Agent 2 was down? In my case it seems that it only
> persists them on disk but it does not resend the data  when it reconnects
> with agent 1.
>
> Also a separate but related question, in my custom hdfs-sink, is throwing
> an exception sufficient to indicate that the event was not processed
> successfully? I would like to to propagate back to agent1 that agent2 has
> failed in persisting to HDFS so that it resends the data (for example, if
> an HDFS write has failed, agent1 should resend that event). Currently I'm
> only throwing an exception but it seems that this is not triggering a retry.
>
> Thank you
>
>
>