You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Chris Neal <cw...@gmail.com> on 2012/08/17 17:59:56 UTC

Failover Processor + Load Balanced Processor?

Hi all.

The User Guide talks about the various types of Sink Processors, but
doesn't say whether they can be aggregated together.  A Failover Processor
that moves between 1..n sinks is great, as is a Load Balancer Processor
that moves between 1..n sinks, but what is the best would be an agent that
can utilize both a Failover Processor AND a Load Balancer Processor!

I've created a configuration which I believe supports this, and the Agent
starts up and processes events, but I wanted to ping this group to make
sure that this configuration is really doing what I think it is doing
behind the scenes.

Comments?

# Define the sources, sinks, and channels for the agent
agent.sources = avro-instance_1-source avro-instance_2-source
agent.channels = memory-agent-channel
agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
agent.sinkgroups = failover-sink-group lb-sink-group

# Bind sources to channels
agent.sources.avro-instance_1-source.channels = memory-agent-channel
agent.sources.avro-instance_2-source.channels = memory-agent-channel

# Define sink group for failover
agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
avro-hdfs_2-sink
agent.sinkgroups.failover-sink-group.processor.type = failover
agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink = 5
agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink =
10
agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000

# Define sink group for load balancing
agent.sinkgroups = lb-sink-group
agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
agent.sinkgroups.group1.processor.type = load_balance
agent.sinkgroups.group1.processor.selector = round_robin

# Bind sinks to channels
agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel

# avro-instance_1-source properties
agent.sources.avro-instance_1-source.type = exec
agent.sources.avro-instance_1-source.command = tail -F /somedir/Trans.log
agent.sources.avro-instance_1-source.restart = true
agent.sources.avro-instance_1-source.batchSize = 100

# avro-instance_2-source properties
agent.sources.avro-instance_2-source.type = exec
agent.sources.avro-instance_2-source.command = tail -F
/somedir/UDXMLTrans.log
agent.sources.avro-instance_2-source.restart = true
agent.sources.avro-instance_2-source.batchSize = 100

# avro-hdfs_1-sink properties
agent.sinks.avro-hdfs_1-sink.type = avro
agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
agent.sinks.avro-hdfs_1-sink.port = 10000

# avro-hdfs_2-sink properties
agent.sinks.avro-hdfs_2-sink.type = avro
agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
agent.sinks.avro-hdfs_2-sink.port = 10000

# memory-agent-channel properties
agent.channels.memory-agent-channel.type = memory
agent.channels.memory-agent-channel.capacity = 20000
agent.channels.memory-agent-channel.transactionCapacity = 100

Thanks!

Re: Failover Processor + Load Balanced Processor?

Posted by Chris Neal <cw...@gmail.com>.
Nice!  Thanks :)  Will take a look.

On Wed, Sep 5, 2012 at 11:24 PM, Juhani Connolly <
juhani_connolly@cyberagent.co.jp> wrote:

>  Since there was no response to this, I set up a separate ticket  at
> https://issues.apache.org/jira/browse/FLUME-1541 and implemented it as a
> SinkSelector for the LoadBalancingSinkProcessor.
>
> Review can be found at https://reviews.apache.org/r/6939/
>
> Chris: if you're interested you may want to give this a poke, see if it
> fulfills your needs. The only change in configuration needed is to change
> the selector type from "round_robin" to "round_robin_backoff"
>
>
> On 09/04/2012 07:39 PM, Juhani Connolly wrote:
>
> I'm thinking of working on this(adding backoff semantics to the load
> balancing processor)
>
> The ticket FLUME-1488 however refers to the load balancing rpc client(or
> is it just poorly worded/unclear?). If it is in fact a separate ticket I'll
> file one for this
>
> Anyway, I was  interested in hearing thoughts on approach. I'd have liked
> to do it within the framework of the LoadBalancingSinkProcessor by adding a
> new Selector, however as it is now, it the processor provides no feedback
> to the selectors about whether sinks are working or not, so this can't work.
>
> This leaves two choices: write a new SinkProcessor or modify the
> SinkSelector interface to give it a couple of callbacks that the processor
> calls to inform the selector of trouble. This shouldn't really be a problem
> even if people have written their own selectors so long as they are
> extending AbstractSinkSelector which can stub the callbacks.
>
> Thoughts?
>
> On 08/18/2012 02:01 AM, Arvind Prabhakar wrote:
>
> Hi,
>
>  FYI - the load balancing sink processor does support simple failover
> semantics. The way it works is that if a sink is down, it will proceed to
> the next sink in the group until all sinks are exhausted. The failover sink
> processor on the other hand does complex failure handling and back-off such
> as blacklisting sinks that repeatedly fail etc. The issue [1] tracks
> enhancing this processor to support backoff semantics.
>
>  The one issue with your configuration that I could spot by a quick
> glance is that you are adding your active sinks to both the sink groups.
> This does not really work and the configuration subsystem simply flags the
> second inclusion as a problem and ignores it. By design, a sink can either
> be on its own or in one explicit sink group.
>
>  [1] https://issues.apache.org/jira/browse/FLUME-1488
>
>  Regards,
> Arvind Prabhakar
>
>  On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <cw...@gmail.com> wrote:
>
>> Hi all.
>>
>>  The User Guide talks about the various types of Sink Processors, but
>> doesn't say whether they can be aggregated together.  A Failover Processor
>> that moves between 1..n sinks is great, as is a Load Balancer Processor
>> that moves between 1..n sinks, but what is the best would be an agent that
>> can utilize both a Failover Processor AND a Load Balancer Processor!
>>
>>  I've created a configuration which I believe supports this, and the
>> Agent starts up and processes events, but I wanted to ping this group to
>> make sure that this configuration is really doing what I think it is doing
>> behind the scenes.
>>
>>  Comments?
>>
>>  # Define the sources, sinks, and channels for the agent
>> agent.sources = avro-instance_1-source avro-instance_2-source
>> agent.channels = memory-agent-channel
>> agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>> agent.sinkgroups = failover-sink-group lb-sink-group
>>
>>  # Bind sources to channels
>> agent.sources.avro-instance_1-source.channels = memory-agent-channel
>> agent.sources.avro-instance_2-source.channels = memory-agent-channel
>>
>>  # Define sink group for failover
>> agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
>> avro-hdfs_2-sink
>> agent.sinkgroups.failover-sink-group.processor.type = failover
>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink
>> = 5
>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink
>> = 10
>> agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
>>
>>  # Define sink group for load balancing
>> agent.sinkgroups = lb-sink-group
>> agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>> agent.sinkgroups.group1.processor.type = load_balance
>> agent.sinkgroups.group1.processor.selector = round_robin
>>
>>  # Bind sinks to channels
>> agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
>> agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
>>
>>  # avro-instance_1-source properties
>> agent.sources.avro-instance_1-source.type = exec
>> agent.sources.avro-instance_1-source.command = tail -F /somedir/Trans.log
>> agent.sources.avro-instance_1-source.restart = true
>> agent.sources.avro-instance_1-source.batchSize = 100
>>
>>  # avro-instance_2-source properties
>> agent.sources.avro-instance_2-source.type = exec
>> agent.sources.avro-instance_2-source.command = tail -F
>> /somedir/UDXMLTrans.log
>> agent.sources.avro-instance_2-source.restart = true
>> agent.sources.avro-instance_2-source.batchSize = 100
>>
>>  # avro-hdfs_1-sink properties
>> agent.sinks.avro-hdfs_1-sink.type = avro
>> agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
>> agent.sinks.avro-hdfs_1-sink.port = 10000
>>
>>  # avro-hdfs_2-sink properties
>> agent.sinks.avro-hdfs_2-sink.type = avro
>> agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
>>  agent.sinks.avro-hdfs_2-sink.port = 10000
>>
>>  # memory-agent-channel properties
>> agent.channels.memory-agent-channel.type = memory
>> agent.channels.memory-agent-channel.capacity = 20000
>> agent.channels.memory-agent-channel.transactionCapacity = 100
>>
>>  Thanks!
>>
>
>
>
>

Re: Failover Processor + Load Balanced Processor?

Posted by Juhani Connolly <ju...@cyberagent.co.jp>.
Since there was no response to this, I set up a separate ticket  at 
https://issues.apache.org/jira/browse/FLUME-1541 and implemented it as a 
SinkSelector for the LoadBalancingSinkProcessor.

Review can be found at https://reviews.apache.org/r/6939/

Chris: if you're interested you may want to give this a poke, see if it 
fulfills your needs. The only change in configuration needed is to 
change the selector type from "round_robin" to "round_robin_backoff"

On 09/04/2012 07:39 PM, Juhani Connolly wrote:
> I'm thinking of working on this(adding backoff semantics to the load 
> balancing processor)
>
> The ticket FLUME-1488 however refers to the load balancing rpc 
> client(or is it just poorly worded/unclear?). If it is in fact a 
> separate ticket I'll file one for this
>
> Anyway, I was  interested in hearing thoughts on approach. I'd have 
> liked to do it within the framework of the LoadBalancingSinkProcessor 
> by adding a new Selector, however as it is now, it the processor 
> provides no feedback to the selectors about whether sinks are working 
> or not, so this can't work.
>
> This leaves two choices: write a new SinkProcessor or modify the 
> SinkSelector interface to give it a couple of callbacks that the 
> processor calls to inform the selector of trouble. This shouldn't 
> really be a problem even if people have written their own selectors so 
> long as they are extending AbstractSinkSelector which can stub the 
> callbacks.
>
> Thoughts?
>
> On 08/18/2012 02:01 AM, Arvind Prabhakar wrote:
>> Hi,
>>
>> FYI - the load balancing sink processor does support simple failover 
>> semantics. The way it works is that if a sink is down, it will 
>> proceed to the next sink in the group until all sinks are exhausted. 
>> The failover sink processor on the other hand does complex failure 
>> handling and back-off such as blacklisting sinks that repeatedly fail 
>> etc. The issue [1] tracks enhancing this processor to support backoff 
>> semantics.
>>
>> The one issue with your configuration that I could spot by a quick 
>> glance is that you are adding your active sinks to both the sink 
>> groups. This does not really work and the configuration subsystem 
>> simply flags the second inclusion as a problem and ignores it. By 
>> design, a sink can either be on its own or in one explicit sink group.
>>
>> [1] https://issues.apache.org/jira/browse/FLUME-1488
>>
>> Regards,
>> Arvind Prabhakar
>>
>> On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <cwneal@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>>     Hi all.
>>
>>     The User Guide talks about the various types of Sink Processors,
>>     but doesn't say whether they can be aggregated together.  A
>>     Failover Processor that moves between 1..n sinks is great, as is
>>     a Load Balancer Processor that moves between 1..n sinks, but what
>>     is the best would be an agent that can utilize both a Failover
>>     Processor AND a Load Balancer Processor!
>>
>>     I've created a configuration which I believe supports this, and
>>     the Agent starts up and processes events, but I wanted to ping
>>     this group to make sure that this configuration is really doing
>>     what I think it is doing behind the scenes.
>>
>>     Comments?
>>
>>     # Define the sources, sinks, and channels for the agent
>>     agent.sources = avro-instance_1-source avro-instance_2-source
>>     agent.channels = memory-agent-channel
>>     agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>>     agent.sinkgroups = failover-sink-group lb-sink-group
>>
>>     # Bind sources to channels
>>     agent.sources.avro-instance_1-source.channels = memory-agent-channel
>>     agent.sources.avro-instance_2-source.channels = memory-agent-channel
>>
>>     # Define sink group for failover
>>     agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
>>     avro-hdfs_2-sink
>>     agent.sinkgroups.failover-sink-group.processor.type = failover
>>     agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink
>>     = 5
>>     agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink
>>     = 10
>>     agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
>>
>>     # Define sink group for load balancing
>>     agent.sinkgroups = lb-sink-group
>>     agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>>     agent.sinkgroups.group1.processor.type = load_balance
>>     agent.sinkgroups.group1.processor.selector = round_robin
>>
>>     # Bind sinks to channels
>>     agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
>>     agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
>>
>>     # avro-instance_1-source properties
>>     agent.sources.avro-instance_1-source.type = exec
>>     agent.sources.avro-instance_1-source.command = tail -F
>>     /somedir/Trans.log
>>     agent.sources.avro-instance_1-source.restart = true
>>     agent.sources.avro-instance_1-source.batchSize = 100
>>
>>     # avro-instance_2-source properties
>>     agent.sources.avro-instance_2-source.type = exec
>>     agent.sources.avro-instance_2-source.command = tail -F
>>     /somedir/UDXMLTrans.log
>>     agent.sources.avro-instance_2-source.restart = true
>>     agent.sources.avro-instance_2-source.batchSize = 100
>>
>>     # avro-hdfs_1-sink properties
>>     agent.sinks.avro-hdfs_1-sink.type = avro
>>     agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
>>     <http://hdfshost1.domin.com>
>>     agent.sinks.avro-hdfs_1-sink.port = 10000
>>
>>     # avro-hdfs_2-sink properties
>>     agent.sinks.avro-hdfs_2-sink.type = avro
>>     agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
>>     <http://hdfshost2.domain.com>
>>     agent.sinks.avro-hdfs_2-sink.port = 10000
>>
>>     # memory-agent-channel properties
>>     agent.channels.memory-agent-channel.type = memory
>>     agent.channels.memory-agent-channel.capacity = 20000
>>     agent.channels.memory-agent-channel.transactionCapacity = 100
>>
>>     Thanks!
>>
>>
>


Re: Failover Processor + Load Balanced Processor?

Posted by Juhani Connolly <ju...@cyberagent.co.jp>.
I'm thinking of working on this(adding backoff semantics to the load 
balancing processor)

The ticket FLUME-1488 however refers to the load balancing rpc client(or 
is it just poorly worded/unclear?). If it is in fact a separate ticket 
I'll file one for this

Anyway, I was  interested in hearing thoughts on approach. I'd have 
liked to do it within the framework of the LoadBalancingSinkProcessor by 
adding a new Selector, however as it is now, it the processor provides 
no feedback to the selectors about whether sinks are working or not, so 
this can't work.

This leaves two choices: write a new SinkProcessor or modify the 
SinkSelector interface to give it a couple of callbacks that the 
processor calls to inform the selector of trouble. This shouldn't really 
be a problem even if people have written their own selectors so long as 
they are extending AbstractSinkSelector which can stub the callbacks.

Thoughts?

On 08/18/2012 02:01 AM, Arvind Prabhakar wrote:
> Hi,
>
> FYI - the load balancing sink processor does support simple failover 
> semantics. The way it works is that if a sink is down, it will proceed 
> to the next sink in the group until all sinks are exhausted. The 
> failover sink processor on the other hand does complex failure 
> handling and back-off such as blacklisting sinks that repeatedly fail 
> etc. The issue [1] tracks enhancing this processor to support backoff 
> semantics.
>
> The one issue with your configuration that I could spot by a quick 
> glance is that you are adding your active sinks to both the sink 
> groups. This does not really work and the configuration subsystem 
> simply flags the second inclusion as a problem and ignores it. By 
> design, a sink can either be on its own or in one explicit sink group.
>
> [1] https://issues.apache.org/jira/browse/FLUME-1488
>
> Regards,
> Arvind Prabhakar
>
> On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <cwneal@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi all.
>
>     The User Guide talks about the various types of Sink Processors,
>     but doesn't say whether they can be aggregated together.  A
>     Failover Processor that moves between 1..n sinks is great, as is a
>     Load Balancer Processor that moves between 1..n sinks, but what is
>     the best would be an agent that can utilize both a Failover
>     Processor AND a Load Balancer Processor!
>
>     I've created a configuration which I believe supports this, and
>     the Agent starts up and processes events, but I wanted to ping
>     this group to make sure that this configuration is really doing
>     what I think it is doing behind the scenes.
>
>     Comments?
>
>     # Define the sources, sinks, and channels for the agent
>     agent.sources = avro-instance_1-source avro-instance_2-source
>     agent.channels = memory-agent-channel
>     agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>     agent.sinkgroups = failover-sink-group lb-sink-group
>
>     # Bind sources to channels
>     agent.sources.avro-instance_1-source.channels = memory-agent-channel
>     agent.sources.avro-instance_2-source.channels = memory-agent-channel
>
>     # Define sink group for failover
>     agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
>     avro-hdfs_2-sink
>     agent.sinkgroups.failover-sink-group.processor.type = failover
>     agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink
>     = 5
>     agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink
>     = 10
>     agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
>
>     # Define sink group for load balancing
>     agent.sinkgroups = lb-sink-group
>     agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>     agent.sinkgroups.group1.processor.type = load_balance
>     agent.sinkgroups.group1.processor.selector = round_robin
>
>     # Bind sinks to channels
>     agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
>     agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
>
>     # avro-instance_1-source properties
>     agent.sources.avro-instance_1-source.type = exec
>     agent.sources.avro-instance_1-source.command = tail -F
>     /somedir/Trans.log
>     agent.sources.avro-instance_1-source.restart = true
>     agent.sources.avro-instance_1-source.batchSize = 100
>
>     # avro-instance_2-source properties
>     agent.sources.avro-instance_2-source.type = exec
>     agent.sources.avro-instance_2-source.command = tail -F
>     /somedir/UDXMLTrans.log
>     agent.sources.avro-instance_2-source.restart = true
>     agent.sources.avro-instance_2-source.batchSize = 100
>
>     # avro-hdfs_1-sink properties
>     agent.sinks.avro-hdfs_1-sink.type = avro
>     agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
>     <http://hdfshost1.domin.com>
>     agent.sinks.avro-hdfs_1-sink.port = 10000
>
>     # avro-hdfs_2-sink properties
>     agent.sinks.avro-hdfs_2-sink.type = avro
>     agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
>     <http://hdfshost2.domain.com>
>     agent.sinks.avro-hdfs_2-sink.port = 10000
>
>     # memory-agent-channel properties
>     agent.channels.memory-agent-channel.type = memory
>     agent.channels.memory-agent-channel.capacity = 20000
>     agent.channels.memory-agent-channel.transactionCapacity = 100
>
>     Thanks!
>
>


Re: Failover Processor + Load Balanced Processor?

Posted by Chris Neal <cw...@gmail.com>.
Understood.  Thanks so much for your time!

On Fri, Aug 17, 2012 at 12:19 PM, Arvind Prabhakar <ar...@apache.org>wrote:

> On Fri, Aug 17, 2012 at 10:11 AM, Chris Neal <cw...@gmail.com> wrote:
>
>> Thanks Arvind,
>>
>> So in the load balanced scenario, if sink A goes down, and events go all
>> to sink B, does sink A's status ever get re-checked to be added back to the
>> pool?  Or once it's down, it's down?
>>
>
> It does get added back for the subsequent invocations. The Failover sink
> processor on the other hand has a back-off semantic which
> will exponentially increase the waiting period before a sink is retried.
>
> Regards,
> Arvind Prabhakar
>
>
>>
>> Chris
>>
>> On Fri, Aug 17, 2012 at 12:01 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>
>>> Hi,
>>>
>>> FYI - the load balancing sink processor does support simple failover
>>> semantics. The way it works is that if a sink is down, it will proceed to
>>> the next sink in the group until all sinks are exhausted. The failover sink
>>> processor on the other hand does complex failure handling and back-off such
>>> as blacklisting sinks that repeatedly fail etc. The issue [1] tracks
>>> enhancing this processor to support backoff semantics.
>>>
>>> The one issue with your configuration that I could spot by a quick
>>> glance is that you are adding your active sinks to both the sink groups.
>>> This does not really work and the configuration subsystem simply flags the
>>> second inclusion as a problem and ignores it. By design, a sink can either
>>> be on its own or in one explicit sink group.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLUME-1488
>>>
>>> Regards,
>>> Arvind Prabhakar
>>>
>>> On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <cw...@gmail.com> wrote:
>>>
>>>> Hi all.
>>>>
>>>> The User Guide talks about the various types of Sink Processors, but
>>>> doesn't say whether they can be aggregated together.  A Failover Processor
>>>> that moves between 1..n sinks is great, as is a Load Balancer Processor
>>>> that moves between 1..n sinks, but what is the best would be an agent that
>>>> can utilize both a Failover Processor AND a Load Balancer Processor!
>>>>
>>>> I've created a configuration which I believe supports this, and the
>>>> Agent starts up and processes events, but I wanted to ping this group to
>>>> make sure that this configuration is really doing what I think it is doing
>>>> behind the scenes.
>>>>
>>>> Comments?
>>>>
>>>> # Define the sources, sinks, and channels for the agent
>>>> agent.sources = avro-instance_1-source avro-instance_2-source
>>>> agent.channels = memory-agent-channel
>>>> agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>>>> agent.sinkgroups = failover-sink-group lb-sink-group
>>>>
>>>> # Bind sources to channels
>>>> agent.sources.avro-instance_1-source.channels = memory-agent-channel
>>>> agent.sources.avro-instance_2-source.channels = memory-agent-channel
>>>>
>>>> # Define sink group for failover
>>>> agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
>>>> avro-hdfs_2-sink
>>>> agent.sinkgroups.failover-sink-group.processor.type = failover
>>>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink
>>>> = 5
>>>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink
>>>> = 10
>>>> agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
>>>>
>>>> # Define sink group for load balancing
>>>> agent.sinkgroups = lb-sink-group
>>>> agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>>>> agent.sinkgroups.group1.processor.type = load_balance
>>>> agent.sinkgroups.group1.processor.selector = round_robin
>>>>
>>>> # Bind sinks to channels
>>>> agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
>>>> agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
>>>>
>>>> # avro-instance_1-source properties
>>>> agent.sources.avro-instance_1-source.type = exec
>>>> agent.sources.avro-instance_1-source.command = tail -F
>>>> /somedir/Trans.log
>>>> agent.sources.avro-instance_1-source.restart = true
>>>> agent.sources.avro-instance_1-source.batchSize = 100
>>>>
>>>> # avro-instance_2-source properties
>>>> agent.sources.avro-instance_2-source.type = exec
>>>> agent.sources.avro-instance_2-source.command = tail -F
>>>> /somedir/UDXMLTrans.log
>>>> agent.sources.avro-instance_2-source.restart = true
>>>> agent.sources.avro-instance_2-source.batchSize = 100
>>>>
>>>> # avro-hdfs_1-sink properties
>>>> agent.sinks.avro-hdfs_1-sink.type = avro
>>>> agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
>>>> agent.sinks.avro-hdfs_1-sink.port = 10000
>>>>
>>>> # avro-hdfs_2-sink properties
>>>> agent.sinks.avro-hdfs_2-sink.type = avro
>>>> agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
>>>>  agent.sinks.avro-hdfs_2-sink.port = 10000
>>>>
>>>> # memory-agent-channel properties
>>>> agent.channels.memory-agent-channel.type = memory
>>>> agent.channels.memory-agent-channel.capacity = 20000
>>>> agent.channels.memory-agent-channel.transactionCapacity = 100
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>>
>

Re: Failover Processor + Load Balanced Processor?

Posted by Arvind Prabhakar <ar...@apache.org>.
On Fri, Aug 17, 2012 at 10:11 AM, Chris Neal <cw...@gmail.com> wrote:

> Thanks Arvind,
>
> So in the load balanced scenario, if sink A goes down, and events go all
> to sink B, does sink A's status ever get re-checked to be added back to the
> pool?  Or once it's down, it's down?
>

It does get added back for the subsequent invocations. The Failover sink
processor on the other hand has a back-off semantic which
will exponentially increase the waiting period before a sink is retried.

Regards,
Arvind Prabhakar


>
> Chris
>
> On Fri, Aug 17, 2012 at 12:01 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>
>> Hi,
>>
>> FYI - the load balancing sink processor does support simple failover
>> semantics. The way it works is that if a sink is down, it will proceed to
>> the next sink in the group until all sinks are exhausted. The failover sink
>> processor on the other hand does complex failure handling and back-off such
>> as blacklisting sinks that repeatedly fail etc. The issue [1] tracks
>> enhancing this processor to support backoff semantics.
>>
>> The one issue with your configuration that I could spot by a quick glance
>> is that you are adding your active sinks to both the sink groups. This does
>> not really work and the configuration subsystem simply flags the second
>> inclusion as a problem and ignores it. By design, a sink can either be on
>> its own or in one explicit sink group.
>>
>> [1] https://issues.apache.org/jira/browse/FLUME-1488
>>
>> Regards,
>> Arvind Prabhakar
>>
>> On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <cw...@gmail.com> wrote:
>>
>>> Hi all.
>>>
>>> The User Guide talks about the various types of Sink Processors, but
>>> doesn't say whether they can be aggregated together.  A Failover Processor
>>> that moves between 1..n sinks is great, as is a Load Balancer Processor
>>> that moves between 1..n sinks, but what is the best would be an agent that
>>> can utilize both a Failover Processor AND a Load Balancer Processor!
>>>
>>> I've created a configuration which I believe supports this, and the
>>> Agent starts up and processes events, but I wanted to ping this group to
>>> make sure that this configuration is really doing what I think it is doing
>>> behind the scenes.
>>>
>>> Comments?
>>>
>>> # Define the sources, sinks, and channels for the agent
>>> agent.sources = avro-instance_1-source avro-instance_2-source
>>> agent.channels = memory-agent-channel
>>> agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>>> agent.sinkgroups = failover-sink-group lb-sink-group
>>>
>>> # Bind sources to channels
>>> agent.sources.avro-instance_1-source.channels = memory-agent-channel
>>> agent.sources.avro-instance_2-source.channels = memory-agent-channel
>>>
>>> # Define sink group for failover
>>> agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
>>> avro-hdfs_2-sink
>>> agent.sinkgroups.failover-sink-group.processor.type = failover
>>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink
>>> = 5
>>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink
>>> = 10
>>> agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
>>>
>>> # Define sink group for load balancing
>>> agent.sinkgroups = lb-sink-group
>>> agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>>> agent.sinkgroups.group1.processor.type = load_balance
>>> agent.sinkgroups.group1.processor.selector = round_robin
>>>
>>> # Bind sinks to channels
>>> agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
>>> agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
>>>
>>> # avro-instance_1-source properties
>>> agent.sources.avro-instance_1-source.type = exec
>>> agent.sources.avro-instance_1-source.command = tail -F /somedir/Trans.log
>>> agent.sources.avro-instance_1-source.restart = true
>>> agent.sources.avro-instance_1-source.batchSize = 100
>>>
>>> # avro-instance_2-source properties
>>> agent.sources.avro-instance_2-source.type = exec
>>> agent.sources.avro-instance_2-source.command = tail -F
>>> /somedir/UDXMLTrans.log
>>> agent.sources.avro-instance_2-source.restart = true
>>> agent.sources.avro-instance_2-source.batchSize = 100
>>>
>>> # avro-hdfs_1-sink properties
>>> agent.sinks.avro-hdfs_1-sink.type = avro
>>> agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
>>> agent.sinks.avro-hdfs_1-sink.port = 10000
>>>
>>> # avro-hdfs_2-sink properties
>>> agent.sinks.avro-hdfs_2-sink.type = avro
>>> agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
>>>  agent.sinks.avro-hdfs_2-sink.port = 10000
>>>
>>> # memory-agent-channel properties
>>> agent.channels.memory-agent-channel.type = memory
>>> agent.channels.memory-agent-channel.capacity = 20000
>>> agent.channels.memory-agent-channel.transactionCapacity = 100
>>>
>>> Thanks!
>>>
>>
>>
>

Re: Failover Processor + Load Balanced Processor?

Posted by Chris Neal <cw...@gmail.com>.
Thanks Arvind,

So in the load balanced scenario, if sink A goes down, and events go all to
sink B, does sink A's status ever get re-checked to be added back to the
pool?  Or once it's down, it's down?

Chris

On Fri, Aug 17, 2012 at 12:01 PM, Arvind Prabhakar <ar...@apache.org>wrote:

> Hi,
>
> FYI - the load balancing sink processor does support simple failover
> semantics. The way it works is that if a sink is down, it will proceed to
> the next sink in the group until all sinks are exhausted. The failover sink
> processor on the other hand does complex failure handling and back-off such
> as blacklisting sinks that repeatedly fail etc. The issue [1] tracks
> enhancing this processor to support backoff semantics.
>
> The one issue with your configuration that I could spot by a quick glance
> is that you are adding your active sinks to both the sink groups. This does
> not really work and the configuration subsystem simply flags the second
> inclusion as a problem and ignores it. By design, a sink can either be on
> its own or in one explicit sink group.
>
> [1] https://issues.apache.org/jira/browse/FLUME-1488
>
> Regards,
> Arvind Prabhakar
>
> On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <cw...@gmail.com> wrote:
>
>> Hi all.
>>
>> The User Guide talks about the various types of Sink Processors, but
>> doesn't say whether they can be aggregated together.  A Failover Processor
>> that moves between 1..n sinks is great, as is a Load Balancer Processor
>> that moves between 1..n sinks, but what is the best would be an agent that
>> can utilize both a Failover Processor AND a Load Balancer Processor!
>>
>> I've created a configuration which I believe supports this, and the Agent
>> starts up and processes events, but I wanted to ping this group to make
>> sure that this configuration is really doing what I think it is doing
>> behind the scenes.
>>
>> Comments?
>>
>> # Define the sources, sinks, and channels for the agent
>> agent.sources = avro-instance_1-source avro-instance_2-source
>> agent.channels = memory-agent-channel
>> agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>> agent.sinkgroups = failover-sink-group lb-sink-group
>>
>> # Bind sources to channels
>> agent.sources.avro-instance_1-source.channels = memory-agent-channel
>> agent.sources.avro-instance_2-source.channels = memory-agent-channel
>>
>> # Define sink group for failover
>> agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
>> avro-hdfs_2-sink
>> agent.sinkgroups.failover-sink-group.processor.type = failover
>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink
>> = 5
>> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink
>> = 10
>> agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
>>
>> # Define sink group for load balancing
>> agent.sinkgroups = lb-sink-group
>> agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
>> agent.sinkgroups.group1.processor.type = load_balance
>> agent.sinkgroups.group1.processor.selector = round_robin
>>
>> # Bind sinks to channels
>> agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
>> agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
>>
>> # avro-instance_1-source properties
>> agent.sources.avro-instance_1-source.type = exec
>> agent.sources.avro-instance_1-source.command = tail -F /somedir/Trans.log
>> agent.sources.avro-instance_1-source.restart = true
>> agent.sources.avro-instance_1-source.batchSize = 100
>>
>> # avro-instance_2-source properties
>> agent.sources.avro-instance_2-source.type = exec
>> agent.sources.avro-instance_2-source.command = tail -F
>> /somedir/UDXMLTrans.log
>> agent.sources.avro-instance_2-source.restart = true
>> agent.sources.avro-instance_2-source.batchSize = 100
>>
>> # avro-hdfs_1-sink properties
>> agent.sinks.avro-hdfs_1-sink.type = avro
>> agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
>> agent.sinks.avro-hdfs_1-sink.port = 10000
>>
>> # avro-hdfs_2-sink properties
>> agent.sinks.avro-hdfs_2-sink.type = avro
>> agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
>>  agent.sinks.avro-hdfs_2-sink.port = 10000
>>
>> # memory-agent-channel properties
>> agent.channels.memory-agent-channel.type = memory
>> agent.channels.memory-agent-channel.capacity = 20000
>> agent.channels.memory-agent-channel.transactionCapacity = 100
>>
>> Thanks!
>>
>
>

Re: Failover Processor + Load Balanced Processor?

Posted by Arvind Prabhakar <ar...@apache.org>.
Hi,

FYI - the load balancing sink processor does support simple failover
semantics. The way it works is that if a sink is down, it will proceed to
the next sink in the group until all sinks are exhausted. The failover sink
processor on the other hand does complex failure handling and back-off such
as blacklisting sinks that repeatedly fail etc. The issue [1] tracks
enhancing this processor to support backoff semantics.

The one issue with your configuration that I could spot by a quick glance
is that you are adding your active sinks to both the sink groups. This does
not really work and the configuration subsystem simply flags the second
inclusion as a problem and ignores it. By design, a sink can either be on
its own or in one explicit sink group.

[1] https://issues.apache.org/jira/browse/FLUME-1488

Regards,
Arvind Prabhakar

On Fri, Aug 17, 2012 at 8:59 AM, Chris Neal <cw...@gmail.com> wrote:

> Hi all.
>
> The User Guide talks about the various types of Sink Processors, but
> doesn't say whether they can be aggregated together.  A Failover Processor
> that moves between 1..n sinks is great, as is a Load Balancer Processor
> that moves between 1..n sinks, but what is the best would be an agent that
> can utilize both a Failover Processor AND a Load Balancer Processor!
>
> I've created a configuration which I believe supports this, and the Agent
> starts up and processes events, but I wanted to ping this group to make
> sure that this configuration is really doing what I think it is doing
> behind the scenes.
>
> Comments?
>
> # Define the sources, sinks, and channels for the agent
> agent.sources = avro-instance_1-source avro-instance_2-source
> agent.channels = memory-agent-channel
> agent.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
> agent.sinkgroups = failover-sink-group lb-sink-group
>
> # Bind sources to channels
> agent.sources.avro-instance_1-source.channels = memory-agent-channel
> agent.sources.avro-instance_2-source.channels = memory-agent-channel
>
> # Define sink group for failover
> agent.sinkgroups.failover-sink-group.sinks = avro-hdfs_1-sink
> avro-hdfs_2-sink
> agent.sinkgroups.failover-sink-group.processor.type = failover
> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_1-sink =
> 5
> agent.sinkgroups.failover-sink-group.processor.priority.avro-hdfs_2-sink =
> 10
> agent.sinkgroups.failover-sink-group.processor.maxpenalty = 10000
>
> # Define sink group for load balancing
> agent.sinkgroups = lb-sink-group
> agent.sinkgroups.group1.sinks = avro-hdfs_1-sink avro-hdfs_2-sink
> agent.sinkgroups.group1.processor.type = load_balance
> agent.sinkgroups.group1.processor.selector = round_robin
>
> # Bind sinks to channels
> agent.sinks.avro-hdfs_1-sink.channel = memory-agent-channel
> agent.sinks.avro-hdfs_2-sink.channel = memory-agent-channel
>
> # avro-instance_1-source properties
> agent.sources.avro-instance_1-source.type = exec
> agent.sources.avro-instance_1-source.command = tail -F /somedir/Trans.log
> agent.sources.avro-instance_1-source.restart = true
> agent.sources.avro-instance_1-source.batchSize = 100
>
> # avro-instance_2-source properties
> agent.sources.avro-instance_2-source.type = exec
> agent.sources.avro-instance_2-source.command = tail -F
> /somedir/UDXMLTrans.log
> agent.sources.avro-instance_2-source.restart = true
> agent.sources.avro-instance_2-source.batchSize = 100
>
> # avro-hdfs_1-sink properties
> agent.sinks.avro-hdfs_1-sink.type = avro
> agent.sinks.avro-hdfs_1-sink.hostname = hdfshost1.domin.com
> agent.sinks.avro-hdfs_1-sink.port = 10000
>
> # avro-hdfs_2-sink properties
> agent.sinks.avro-hdfs_2-sink.type = avro
> agent.sinks.avro-hdfs_2-sink.hostname = hdfshost2.domain.com
>  agent.sinks.avro-hdfs_2-sink.port = 10000
>
> # memory-agent-channel properties
> agent.channels.memory-agent-channel.type = memory
> agent.channels.memory-agent-channel.capacity = 20000
> agent.channels.memory-agent-channel.transactionCapacity = 100
>
> Thanks!
>