You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Jarek Jarcec Cecho (Commented) (JIRA)" <ji...@apache.org> on 2012/02/09 21:12:57 UTC

[jira] [Commented] (FLUME-952) Modifying SinkRunner to be pluggable to allow for failover/replication.

    [ https://issues.apache.org/jira/browse/FLUME-952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13204811#comment-13204811 ] 

Jarek Jarcec Cecho commented on FLUME-952:
------------------------------------------

I finally found time to study source codes and form the idea to more or less usable form. Please consider my comment as a brainstorming and therefore feel free to add your own suggestions, ideas or share your concerns and objections. I would split up the task into several sub tasks:

1) Add support for multiple sinks into SinkRunner
I would propose to make SinRunner aware of multiple sinks, which include:

* Constructor should accept array of sinks
* getSink, setSink should work with that array (and renamed to getSinks, setSinks)
* start, stop needs to work with all sinks

2) Change Sink configuration 
Current sink configuration is done on a precondition that one sink have exactly one runner. As we would like to incorporate failover mechanism into the Runner, it's clear that one runner can be used for multiple sinks at once. I believe that we need to incorporate such changes into configuration as well. I would propose following scheme:

* $host.sinks.$logicalName.runner For runner specific configuration (for example polling interval)
* $host.sinks.$logicalName.runner.selector Selector type (default, failover, loadbalancer)
* $host.sinks.$logicalName.1 First sink configuration for the runner
* $host.sinks.$logicalName.2 Second sink configuration for the runner

Example of "normal" sink configuration would be:

agent1.sinks.moo.runner.selector = default
agent1.sinks.moo.runner.polling.interval = 10
agent1.sinks.moo.1.type = hdfs
agent1.sinks.moo.1.hdfs.path = hdfs://namenode:8020/flume-ng/

Example of "failover" sink configuration:

agent2.sinks.foo.runner.selector = failover
agent2.sinks.foo.runner.failover.option1 = value
agent2.sinks.foo.runner.polling.interval = 10
agent2.sinks.foo.1.type = hdfs
agent2.sinks.foo.1.hdfs.path = hdfs://namenode:8020/flume-ng/
agent2.sinks.foo.2.type = hdfs
agent2.sinks.foo.2.hdfs.path = hdfs://namenode-on-mars:8020/flume-ng/

I believe that this change should happen in PropertiesFileConfigurationProvider.loadSinks().

3) Introduce SinkRunner selector
Sink runner selector would be interface choosing which sink we should try. I could imagine something like

public interface SinkSelector {
 
 int supportedSinks();

 void setSinks(Sink[] availableSinks);

 Sink chooseSink(int try) throws Something;
}

Method supportedSinks() should return integer constant telling how many maximal sinks does it support. For example the default selector will support only 1 sink whereas failover selector might support Intereger.MAX_VALUE (or something more reasonable). Method SinkRunner.setSinks(Sink[]) should validate that number of passed sinks is less or equal to SinkSelector.supportedSinks().

Method setSinks() should be called once on load (or change) to notify Selector about all available sinks.

Method chooseSink() should be called from SinkRunner.PollingRunner.run to resolve sink that should be used to call process() method. If this sink won't be able to deliver the event PollingRunner.run() should call SinkSelector.chooseSink() again with incremented argument "try" to resolve another sink to try. Method SinkSelector.chooseSink() should return null in case that there is no other sink to try (or if it do not want to try another sink).

4) Implement default, failover and balancer SinkSelector(s)


I'm not expert on flume source code, so I would greatly appreciate any type of feedback. Again, please consider my comment just as a brainstorming.

Jarcec
                
> Modifying SinkRunner to be pluggable to allow for failover/replication.
> -----------------------------------------------------------------------
>
>                 Key: FLUME-952
>                 URL: https://issues.apache.org/jira/browse/FLUME-952
>             Project: Flume
>          Issue Type: Brainstorming
>          Components: Sinks+Sources
>            Reporter: Juhani Connolly
>             Fix For: v1.1.0
>
>
> Implementing the failover sink runner the following was suggested:
> 1. This needs to be implemented on top of FLUME-949 which deals with removing the notion of a PollableSink altogether. As a result, the SinkRunner will become a concrete implementation that can then allow different sink handling policies - such as either a failover policy (needed for this issue), or load balancing policy (not needed for this issue). Hence the policy part needs to be pluggable rather than the sink runner itself. An example of such a construct is the ChannelSelector and ChannelProcessor implementations.
> In Flume-865 I have implemented FailoverSinkRunner as a separate runner, but I am open to the idea of making it pluggable if it makes the code more maintainable.
> As is, there are many differences between the requirements for Failover and a normal Sink runner, including configuration, initialisation, shutdown, error handling and event processing. If we were to make this pluggable, many hooks would be needed and I don't think there is that much common behavior that warrants using a pluggable system rather than just a solid base class.
> - Adding a new sink to a runner, with configuration variables(such as priority or weight)
> - Policy for handling process: should this just return a list of sinks to process like ChannelSelector and hand off the processing to Process? I think that the specific failover policy for each type of runner  will be different so this feels awkward. I would personally prefer to just pass the process call to the pluggable component and let it be responsible for calling process on the correct sinks, as well as handling errors.
> Right now I am not convinced for the need to make SinkRunner pluggable, but I would be interested to hear other peoples  opinions

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira