You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Juhani Connolly <ju...@gmail.com> on 2012/02/03 09:53:38 UTC

Review Request: FLUME-865 creating a failover sink runner

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/
-----------------------------------------------------------

Review request for Flume.


Summary
-------

This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.

A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.

Some issues that are outstanding
- Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
- Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)


This addresses bug FLUME-865.
    https://issues.apache.org/jira/browse/FLUME-865


Diffs
-----

  flume-ng-core/src/main/java/org/apache/flume/PollableSink.java e86cc59 
  flume-ng-core/src/main/java/org/apache/flume/Sink.java ab9b63c 
  flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 7f5cb17 
  flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java 90b8a86 
  flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkRunner.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java 7c87bf7 
  flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java 5722cd1 
  flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java afcf1c3 
  flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java 47addd1 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkRunnerFactory.java PRE-CREATION 
  flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java 9718491 
  flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkRunner.java PRE-CREATION 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java bea0a3c 
  flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java f1e23bf 
  flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java 716bbf5 
  flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ff20beb 
  flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java 5fc3943 

Diff: https://reviews.apache.org/r/3750/diff


Testing
-------

All unit tests pass.
A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.

I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink


Thanks,

Juhani


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Juhani Connolly <ju...@gmail.com>.

> On 2012-02-20 02:38:52, Arvind Prabhakar wrote:
> > flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java, line 28
> > <https://reviews.apache.org/r/3750/diff/5/?file=75935#file75935line28>
> >
> >     Since the sinks are named components anyway, the passing of Map<> keyed on the names is not necessary. Using a Map here makes ambiguous the order of declaration which may be of some consequence to the sink processor. Hence I suggest using a List instead.
> >     
> >     We can have an AbstractSinkProcessor which implements this method and internally constructs a map for quick lookups by the child classes.

Done this, will be in next patch.
I'm not including an abstract processor until we see some other uses and where repetition might be taking place


> On 2012-02-20 02:38:52, Arvind Prabhakar wrote:
> > flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java, line 40
> > <https://reviews.apache.org/r/3750/diff/5/?file=75938#file75938line40>
> >
> >     In the current implementation it seems that the priority of the sink is proportional to the priority value specified. Usually its the other way around - where lower priority value means higher priority.
> >     
> >

This is because items without a value are counted down from -1 downwards, leaving postive numbers to be naturally higher priority than unassigned items. Also I like high->high and think there are a lot of places where it is specified that way


> On 2012-02-20 02:38:52, Arvind Prabhakar wrote:
> > flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java, lines 40-45
> > <https://reviews.apache.org/r/3750/diff/5/?file=75939#file75939line40>
> >
> >     <nit> Perhaps this can be folded into Context implementation much like context.getSubProperties() does.

agreed


> On 2012-02-20 02:38:52, Arvind Prabhakar wrote:
> > flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java, line 322
> > <https://reviews.apache.org/r/3750/diff/5/?file=75943#file75943line322>
> >
> >     Suggest you use a LinkedHashMap here to preserve the order of declaration of sinks. Otherwise, this order of declaration will be lost when SinkGroup is instantiated.
> >     
> >     While not every group/processor would need to know the order, we should preserve it nevertheless.

This is now a list, and it is up  to the processor what to do with it. FailoverProcessor just turns it into a map for lookups


> On 2012-02-20 02:38:52, Arvind Prabhakar wrote:
> > flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java, line 31
> > <https://reviews.apache.org/r/3750/diff/5/?file=75940#file75940line31>
> >
> >     Following the convention for other named component types - it will be better to have an Enum for SinkProcessorType which defines the short name and implementation class name.

Added in SinkProcessorType and changed the SinkProcessorFactory to use it.


- Juhani


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/#review5216
-----------------------------------------------------------


On 2012-02-20 01:46:41, Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3750/
> -----------------------------------------------------------
> 
> (Updated 2012-02-20 01:46:41)
> 
> 
> Review request for Flume.
> 
> 
> Summary
> -------
> 
> This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.
> 
> A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
> A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
> The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.
> 
> Some issues that are outstanding
> - Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
> - Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)
> 
> 
> This addresses bug FLUME-865.
>     https://issues.apache.org/jira/browse/FLUME-865
> 
> 
> Diffs
> -----
> 
>   flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 074aab3 
>   flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java PRE-CREATION 
>   flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java PRE-CREATION 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
>   flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 
>   flume-ng-node/src/test/resources/flume-conf.properties eebf03d 
> 
> Diff: https://reviews.apache.org/r/3750/diff
> 
> 
> Testing
> -------
> 
> All unit tests pass.
> A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.
> 
> I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink
> 
> 
> Thanks,
> 
> Juhani
> 
>


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Arvind Prabhakar <ar...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/#review5216
-----------------------------------------------------------


Thanks for the patch Juhani. The change is good for checkin. I have some feedback below, feel free to push back and create follow-up issues to address those if you would so prefer.

* Javadocs for new interfaces and default implementation would be great.
* Since the javadoc on PropertiesFileConfigurationProvider are comprehensive, a brief mention of the group semantics in that would help keep it up-to-date.
* In a few places the lines are over 80 character limit. I typically set the "show print margine" on my eclipse workbench and it helps identify those violations.

Some more feedback follows:


flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java
<https://reviews.apache.org/r/3750/#comment11411>

    Since the sinks are named components anyway, the passing of Map<> keyed on the names is not necessary. Using a Map here makes ambiguous the order of declaration which may be of some consequence to the sink processor. Hence I suggest using a List instead.
    
    We can have an AbstractSinkProcessor which implements this method and internally constructs a map for quick lookups by the child classes.



flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
<https://reviews.apache.org/r/3750/#comment11410>

    In the current implementation it seems that the priority of the sink is proportional to the priority value specified. Usually its the other way around - where lower priority value means higher priority.
    
    



flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java
<https://reviews.apache.org/r/3750/#comment11412>

    <nit> Perhaps this can be folded into Context implementation much like context.getSubProperties() does. 



flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
<https://reviews.apache.org/r/3750/#comment11413>

    Following the convention for other named component types - it will be better to have an Enum for SinkProcessorType which defines the short name and implementation class name.



flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
<https://reviews.apache.org/r/3750/#comment11408>

    Suggest you use a LinkedHashMap here to preserve the order of declaration of sinks. Otherwise, this order of declaration will be lost when SinkGroup is instantiated.
    
    While not every group/processor would need to know the order, we should preserve it nevertheless.


- Arvind


On 2012-02-20 01:46:41, Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3750/
> -----------------------------------------------------------
> 
> (Updated 2012-02-20 01:46:41)
> 
> 
> Review request for Flume.
> 
> 
> Summary
> -------
> 
> This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.
> 
> A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
> A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
> The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.
> 
> Some issues that are outstanding
> - Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
> - Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)
> 
> 
> This addresses bug FLUME-865.
>     https://issues.apache.org/jira/browse/FLUME-865
> 
> 
> Diffs
> -----
> 
>   flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 074aab3 
>   flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java PRE-CREATION 
>   flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java PRE-CREATION 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
>   flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 
>   flume-ng-node/src/test/resources/flume-conf.properties eebf03d 
> 
> Diff: https://reviews.apache.org/r/3750/diff
> 
> 
> Testing
> -------
> 
> All unit tests pass.
> A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.
> 
> I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink
> 
> 
> Thanks,
> 
> Juhani
> 
>


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Arvind Prabhakar <ar...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/#review5232
-----------------------------------------------------------

Ship it!


+1. Please attach the patch to the Jira. And thanks for pulling through this patiently!

- Arvind


On 2012-02-20 06:31:35, Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3750/
> -----------------------------------------------------------
> 
> (Updated 2012-02-20 06:31:35)
> 
> 
> Review request for Flume.
> 
> 
> Summary
> -------
> 
> This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.
> 
> A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
> A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
> The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.
> 
> Some issues that are outstanding
> - Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
> - Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)
> 
> 
> This addresses bug FLUME-865.
>     https://issues.apache.org/jira/browse/FLUME-865
> 
> 
> Diffs
> -----
> 
>   flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 074aab3 
>   flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java PRE-CREATION 
>   flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java PRE-CREATION 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
>   flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 
>   flume-ng-node/src/test/resources/flume-conf.properties eebf03d 
> 
> Diff: https://reviews.apache.org/r/3750/diff
> 
> 
> Testing
> -------
> 
> All unit tests pass.
> A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.
> 
> I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink
> 
> 
> Thanks,
> 
> Juhani
> 
>


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Juhani Connolly <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/
-----------------------------------------------------------

(Updated 2012-02-20 06:31:35.600079)


Review request for Flume.


Changes
-------

Implemented the changes described in feedback(except for reversing the priorities and adding abstract processor)


Summary
-------

This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.

A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.

Some issues that are outstanding
- Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
- Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)


This addresses bug FLUME-865.
    https://issues.apache.org/jira/browse/FLUME-865


Diffs (updated)
-----

  flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/SinkProcessorType.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 074aab3 
  flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java PRE-CREATION 
  flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java PRE-CREATION 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
  flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 
  flume-ng-node/src/test/resources/flume-conf.properties eebf03d 

Diff: https://reviews.apache.org/r/3750/diff


Testing
-------

All unit tests pass.
A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.

I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink


Thanks,

Juhani


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Juhani Connolly <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/
-----------------------------------------------------------

(Updated 2012-02-20 01:46:41.732483)


Review request for Flume.


Changes
-------

Fixed a configuration problem and added a sinkgroup to the flume node configuration test resource where it appears to be correctly configured.


Summary
-------

This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.

A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.

Some issues that are outstanding
- Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
- Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)


This addresses bug FLUME-865.
    https://issues.apache.org/jira/browse/FLUME-865


Diffs (updated)
-----

  flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 074aab3 
  flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java PRE-CREATION 
  flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java PRE-CREATION 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
  flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 
  flume-ng-node/src/test/resources/flume-conf.properties eebf03d 

Diff: https://reviews.apache.org/r/3750/diff


Testing
-------

All unit tests pass.
A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.

I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink


Thanks,

Juhani


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Juhani Connolly <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/
-----------------------------------------------------------

(Updated 2012-02-20 01:24:03.880370)


Review request for Flume.


Changes
-------

Cleaned up whitespace/warnings


Summary
-------

This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.

A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.

Some issues that are outstanding
- Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
- Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)


This addresses bug FLUME-865.
    https://issues.apache.org/jira/browse/FLUME-865


Diffs (updated)
-----

  flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 074aab3 
  flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java PRE-CREATION 
  flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java PRE-CREATION 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
  flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 

Diff: https://reviews.apache.org/r/3750/diff


Testing
-------

All unit tests pass.
A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.

I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink


Thanks,

Juhani


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Juhani Connolly <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/
-----------------------------------------------------------

(Updated 2012-02-17 10:15:15.740499)


Review request for Flume.


Changes
-------

I've taken into account all the feedback from the JIRA, so this new version is more or less a from scratch rewrite of the previous patch:

- The concept of SinkGroups is added, mainly for convenience and ease of understanding to the configuration. The groups are checked in the configuration validation, dropping sinks that are duplicated or missing, and only invalidating groups if they have no sinks left in them.
- A SinkProcessor interface is introduced, and Default and Failover implementations are added for it
- When a group is not defined, the default processor is used
- Tests have been updated as necessary and the test from the original patch was repurposed and modified for this version.

All unit tests pass, and I have run default processor fine. As I mentioned earlier in the thread, most sinks do not throw EventDeliveryException making testing beyond unit tests somewhat difficult.

While everything seems ok to me as it is, I'm mainly putting this up now so that people can get a look at it over the weekend and comment on if they feel it is an appropriate way to handle this.

Jarcec: you said you had time over the weekend... If you are interested,  feel free to fix any bugs, I won't be able to touch the code as I'm out in the wilderness. Or maybe you could do a WeightedSinkProcessor implementation?


Summary
-------

This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.

A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.

Some issues that are outstanding
- Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
- Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)


This addresses bug FLUME-865.
    https://issues.apache.org/jira/browse/FLUME-865


Diffs (updated)
-----

  .gitignore edf7d58 
  flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 074aab3 
  flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java PRE-CREATION 
  flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java PRE-CREATION 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
  flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 

Diff: https://reviews.apache.org/r/3750/diff


Testing
-------

All unit tests pass.
A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.

I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink


Thanks,

Juhani


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Arvind Prabhakar <ar...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/#review5039
-----------------------------------------------------------


Thanks for the patch Juhani. The functionality seems to be progressing well. Some comments/feedback to consider regarding configuration:

The current implementation of configuration mechanism is such that it allows you to specify sinks per agent. Each sink is then associated with a channel. For example, for an agent named "host1" the relevant configuration to tie a sink named "sink1" to a channel named "channel1" will be as follows:

   host1.sinks = sink1 ...
   host1.channels = channel1 ...

   host1.sinks.sink1.type = avro
   host1.sinks.sink1.channel = channel1

If we want to have a failover sink for sink1, say we call it "sink2", then the key problem is to be able to associate this new sink with previously configured sink1. Moreover, this change should be backward compatible so that existing configuration files continue to work. Given these requirements, the following is a  possible solution:

   host1.sinks = sink1 sink2 ...
   host1.channels = channel1 ...

   host1.sinks.sink1.type = avro
   host1.sinks.sink1.channel = channel1
   ...
   host1.sinks.sink2.type = avro
   host1.sinks.sink2.channel = channel1
   ...

   host1.sinks.groups = group1
   host1.sinks.groups.group1.sinks = sink1 sink2
   host1.sinks.groups.group1.policy.type = failover

This introduces the notion of sink groups where a group is a named entity that can define it's own policy. Internally, this can then translate to the SinkRunner picking a policy implementation backed by an interface to propagate the event. For example - SinkPolicy.processEvent(Event e).

If no groups are specified, then each sink gets the default policy implementation that can handle a single sink at a time only. This makes the change backward compatible and gives enough flexibility to express different policies. Further, for policy implementations that require more configuration, it can be easily use extra configuration from within the group configuration, much like the way a selector gets its configuration.

Your thoughts?
   







- Arvind


On 2012-02-06 01:12:23, Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3750/
> -----------------------------------------------------------
> 
> (Updated 2012-02-06 01:12:23)
> 
> 
> Review request for Flume.
> 
> 
> Summary
> -------
> 
> This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.
> 
> A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
> A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
> The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.
> 
> Some issues that are outstanding
> - Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
> - Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)
> 
> 
> This addresses bug FLUME-865.
>     https://issues.apache.org/jira/browse/FLUME-865
> 
> 
> Diffs
> -----
> 
>   flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkRunner.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkRunnerFactory.java PRE-CREATION 
>   flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkRunner.java PRE-CREATION 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
>   flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 
> 
> Diff: https://reviews.apache.org/r/3750/diff
> 
> 
> Testing
> -------
> 
> All unit tests pass.
> A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.
> 
> I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink
> 
> 
> Thanks,
> 
> Juhani
> 
>


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Juhani Connolly <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/
-----------------------------------------------------------

(Updated 2012-02-06 01:12:23.474167)


Review request for Flume.


Changes
-------

Updated against the latest version including flume-949.

Important stuff:
- I have not yet made the FailoverSinkRunner a pluggable part for SinkRunner. I think this should be a separate issue, as SinkRunner would need quite a few hooks for this to work(there is also the problem that sink runner currently is designed around an assumption of only owning one sink).
- Configuration is simple(in my opinion at least). I had asked people to look at it and comment but no-one said anything... Maybe looking at the javadoc was too time-consuming, so here is an example with irrelevant details cut out:

source.sinks.avro1.type = avro
source.sinks.avro1.runner.type = failover
source.sinks.avro1.runner.name = fail1
source.sinks.avro1.runner.priority = 1

source.sinks.avro2.type = avro
source.sinks.avro2.runner.type = failover
source.sinks.avro2.runner.name = fail1
source.sinks.avro2.runner.priority = 2

source.sinks.avro3.type = avro
source.sinks.avro3.runner.type = failover
source.sinks.avro3.runner.name = fail1
source.sinks.avro3.runner.priority = 3


Some of my concerns have not been addressed:
- The policy for sink failure is vague and not respected, some sinks do not throw EventDeliveryException ever. We need to agree when to failover(I think that EventDeliveryException is good... Sinks that do not throw it should be fixed)
- There is no clear method to test for sink liveliness. I wanted to add a polling mechanism to regularly attempt to restore dead sinks, but trying to just send data is not going to work as it will pollute logs.

I will throw this patch up onto the issue too, so if we are happy with it, please commit it. If we can come to a consensus about how to deal with making this pluggable and how to deal with failure, I would be happy to help with those issues too.


Summary
-------

This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.

A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.

Some issues that are outstanding
- Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
- Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)


This addresses bug FLUME-865.
    https://issues.apache.org/jira/browse/FLUME-865


Diffs (updated)
-----

  flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkRunner.java PRE-CREATION 
  flume-ng-core/src/main/java/org/apache/flume/sink/SinkRunnerFactory.java PRE-CREATION 
  flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkRunner.java PRE-CREATION 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
  flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java 3d0e366 
  flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java 34d6010 

Diff: https://reviews.apache.org/r/3750/diff


Testing
-------

All unit tests pass.
A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.

I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink


Thanks,

Juhani


Re: Review Request: FLUME-865 creating a failover sink runner

Posted by Arvind Prabhakar <ar...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3750/#review4804
-----------------------------------------------------------


Thanks for the patch Juhani, the change seems to be in the right direction. Some high-level feedback:

1. This needs to be implemented on top of FLUME-949 which deals with removing the notion of a PollableSink altogether. As a result, the SinkRunner will become a concrete implementation that can then allow different sink handling policies - such as either a failover policy (needed for this issue), or load balancing policy (not needed for this issue). Hence the policy part needs to be pluggable rather than the sink runner itself. An example of such a construct is the ChannelSelector and ChannelProcessor implementations.

2. The most important aspect of this change is the ability to configure the sink runner with relative ease using the configuration mechanism. The properties file configuration provider implementation will then have to be modified in order to wire the runner correctly. The implementation should be such that it remains intuitive to the user reading the configuration file and powerful enough to group multiple sinks with different policies within the agent. Some discussion regarding these choices is definitely in order before we implement anything.

That said, I suggest we discuss the #2 item over the JIRA while waiting for FLUME-949 to be committed. 

Thanks

- Arvind


On 2012-02-03 08:53:38, Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3750/
> -----------------------------------------------------------
> 
> (Updated 2012-02-03 08:53:38)
> 
> 
> Review request for Flume.
> 
> 
> Summary
> -------
> 
> This includes the changes from FLUME-945 and is thus subject to change with it. I have a separate patch for the diff from 945->865 if someone wants it.
> 
> A failover sink runner is added, along with a SinkRunnerFactory. The configuration reading is modified so as to read the runner configuration and pass it to the SinkRunnerFactory.
> A failover sink is used by setting the runner type to failover for every participating sink. In addition each sink assigned to a runner must set the same runner.name and a unique runner.priority
> The runner will poll only the highest priority live sink. Should it fail by throwing EventDeliveryException, it will throw it into a pile of dead sinks. Only once all sinks are exhausted are the dead sinks revived.
> 
> Some issues that are outstanding
> - Not all sinks throw EventDeliveryException, or have a clear time when they should be considered "dead". Coupling such sinks with the failover runner will not result in failover
> - Sinks do not have a clear mechanism to test for their liveliness, other than perhaps polling LifecycleState. However for most sinks lifecyclestate tends to remain in START even if the sink is unable to do anything(e.g. avro sink failing to connect)
> 
> 
> This addresses bug FLUME-865.
>     https://issues.apache.org/jira/browse/FLUME-865
> 
> 
> Diffs
> -----
> 
>   flume-ng-core/src/main/java/org/apache/flume/PollableSink.java e86cc59 
>   flume-ng-core/src/main/java/org/apache/flume/Sink.java ab9b63c 
>   flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java 7f5cb17 
>   flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java 90b8a86 
>   flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkRunner.java PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java 7c87bf7 
>   flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java 5722cd1 
>   flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java afcf1c3 
>   flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java 47addd1 
>   flume-ng-core/src/main/java/org/apache/flume/sink/SinkRunnerFactory.java PRE-CREATION 
>   flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java 9718491 
>   flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkRunner.java PRE-CREATION 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java 1ee1f8e 
>   flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java bea0a3c 
>   flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java f1e23bf 
>   flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java 716bbf5 
>   flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ff20beb 
>   flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java 5fc3943 
> 
> Diff: https://reviews.apache.org/r/3750/diff
> 
> 
> Testing
> -------
> 
> All unit tests pass.
> A new unit test was added. It creates a memory channel and a failover runner with 3 sinks that consume a preset number of events then send EventDeliveryException. Events are fed to the channel and assertions are made that the runner is failing over to the correct sinks. This test also passes.
> 
> I also tried to run this on a real cluster, with one master source using a failover runner to three avro sinks which would each fed into separate agents which I would kill off to test the failover. Unfortunately because of AvroSink not throwing EventDeliveryException, this test could not be completed succesfully, and I felt it was beyond the scope of this ticket to modify AvroSink
> 
> 
> Thanks,
> 
> Juhani
> 
>