You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by "Wang, Yongkun | Yongkun | DU" <yo...@mail.rakuten.com> on 2011/06/29 10:25:03 UTC

Multi hop data flow with end-to-end(E2E) sink

Hi,

I am testing using multi-tier flume nodes with multi hops (more than 3 hops). 

The simplest case is described as follow:

Flow: 
agent1 -> collector1 -> collector2 -> hdfs

Configuration
agent1 : tail("file") | agentE2ESink("collector1", 35854) ;
collector1 : collectorSouce(35854) | agentE2ESink("collector2", 35855) ;
collector2 : collectorSource(35855) | collectorSink("hdfs:///...", "test-") ;

The sink configuration of the bridge node "collector1" seems tricky. Maybe using agentE2ESink is not correct, because collector2 crashed with an exception in my test. 

If I configured "collector1" with agentDFOSink, agentBESink, or rpcSink, the whole system worked well. And I checked the log message on "collector2", it showed that collector2 was working in end-to-end mode. It seems that the whole work flow will follow E2E mode as long as the original agent is configured with E2E sink. The sink configuration of intermediate node (collector1) doesn't have any influmence.

I would like to get your confirmation or explanation on this point. Thank you very much.

I got the following exception when collector1 was configured with agentE2ESink:

2011-06-28 21:12:54,004 [logicalNode collector1-47] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because Event already had an event with attribute AckType
java.lang.IllegalArgumentException: Event already had an event with attribute AckType
        at com.cloudera.flume.handlers.thrift.ThriftEventAdaptor.set(ThriftEventAdaptor.java:184)
        at com.cloudera.flume.handlers.endtoend.AckChecksumInjector.append(AckChecksumInjector.java:139)
        at com.cloudera.flume.agent.durability.NaiveFileWALManager$1.append(NaiveFileWALManager.java:457)
        at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:174)
        at com.cloudera.flume.agent.durability.NaiveFileWALDeco.append(NaiveFileWALDeco.java:138)
        at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:112)
        at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
        at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:75)
        at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)


Regards,
Kun

RE: Multi hop data flow with end-to-end(E2E) sink

Posted by "Wang, Yongkun | Yongkun | DU" <yo...@mail.rakuten.com>.
Jon, 

Thank you for the analysis.

For the multi-hop flow, I have realized that only the originating node should be configured with E2E sink, and BE sink for the middle nodes, and collectorSink for the last node.

Maybe we don't need to have any changes on the source code to prevent the crash when user configures the E2E sink on middle node, 
but I think it might be necessary to mention this E2E multi-hop configuration how-to in flume user guide.

Regards,
Kun

-----Original Message-----
From: Jonathan Hsieh [mailto:jon@cloudera.com] 
Sent: Sunday, July 03, 2011 7:20 AM
To: flume-user@incubator.apache.org
Subject: Re: Multi hop data flow with end-to-end(E2E) sink

Hi Kun 

Welcome to the new list!

Answer is inline.


Jon

On Wed, Jun 29, 2011 at 1:25 AM, Wang, Yongkun | Yongkun | DU <yo...@mail.rakuten.com> wrote:


	Hi,
	
	I am testing using multi-tier flume nodes with multi hops (more than 3 hops).
	
	The simplest case is described as follow:
	
	Flow:
	agent1 -> collector1 -> collector2 -> hdfs
	
	Configuration
	agent1 : tail("file") | agentE2ESink("collector1", 35854) ;
	collector1 : collectorSouce(35854) | agentE2ESink("collector2", 35855) ;
	collector2 : collectorSource(35855) | collectorSink("hdfs:///...", "test-") ;
	
	The sink configuration of the bridge node "collector1" seems tricky. Maybe using agentE2ESink is not correct, because collector2 crashed with an exception in my test.
	 

	If I configured "collector1" with agentDFOSink, agentBESink, or rpcSink, the whole system worked well. And I checked the log message on "collector2", it showed that collector2 was working in end-to-end mode. It seems that the whole work flow will follow E2E mode as long as the original agent is configured with E2E sink. The sink configuration of intermediate node (collector1) doesn't have any influmence.
	
	

Ah, I think I know what the problem is -- the middle node (collector1)'s agentE2ESink will try to add more metadata to the message and the node will complain.  By pushing data through another agentE2E, you are actually adding another write ahead log into the stream which is unnecessary for end-to-end reliability.  You only need the agentE2E on the originating node, and on the final end point.  The other modes work because they don't try to add metadata.  My suggestion is to use agentBESink/Chain in the intermediate node.
 

	I would like to get your confirmation or explanation on this point. Thank you very much.
	
	I got the following exception when collector1 was configured with agentE2ESink:
	
	2011-06-28 21:12:54,004 [logicalNode collector1-47] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because Event already had an event with attribute AckType
	java.lang.IllegalArgumentException: Event already had an event with attribute AckType
	       at com.cloudera.flume.handlers.thrift.ThriftEventAdaptor.set(ThriftEventAdaptor.java:184)
	       at com.cloudera.flume.handlers.endtoend.AckChecksumInjector.append(AckChecksumInjector.java:139)
	       at com.cloudera.flume.agent.durability.NaiveFileWALManager$1.append(NaiveFileWALManager.java:457)
	       at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:174)
	       at com.cloudera.flume.agent.durability.NaiveFileWALDeco.append(NaiveFileWALDeco.java:138)
	       at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:112)
	       at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
	       at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:75)
	       at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)
	
	

Yup, this confirmed the attempt to overwrite metadata (in this case it is the AckType attribute)
 


	Regards,
	Kun




--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera

// jon@cloudera.com
 


Re: Multi hop data flow with end-to-end(E2E) sink

Posted by Jonathan Hsieh <jo...@cloudera.com>.
Hi Kun

Welcome to the new list!

Answer is inline.

Jon

On Wed, Jun 29, 2011 at 1:25 AM, Wang, Yongkun | Yongkun | DU <
yongkun.wang@mail.rakuten.com> wrote:

> Hi,
>
> I am testing using multi-tier flume nodes with multi hops (more than 3
> hops).
>
> The simplest case is described as follow:
>
> Flow:
> agent1 -> collector1 -> collector2 -> hdfs
>
> Configuration
> agent1 : tail("file") | agentE2ESink("collector1", 35854) ;
> collector1 : collectorSouce(35854) | agentE2ESink("collector2", 35855) ;
> collector2 : collectorSource(35855) | collectorSink("hdfs:///...", "test-")
> ;
>
> The sink configuration of the bridge node "collector1" seems tricky. Maybe
> using agentE2ESink is not correct, because collector2 crashed with an
> exception in my test.
>
>
 If I configured "collector1" with agentDFOSink, agentBESink, or rpcSink,
> the whole system worked well. And I checked the log message on "collector2",
> it showed that collector2 was working in end-to-end mode. It seems that the
> whole work flow will follow E2E mode as long as the original agent is
> configured with E2E sink. The sink configuration of intermediate node
> (collector1) doesn't have any influmence.
>
> Ah, I think I know what the problem is -- the middle node (collector1)'s
agentE2ESink will try to add more metadata to the message and the node will
complain.  By pushing data through another agentE2E, you are actually adding
another write ahead log into the stream which is unnecessary for end-to-end
reliability.  You only need the agentE2E on the originating node, and on the
final end point.  The other modes work because they don't try to add
metadata.  My suggestion is to use agentBESink/Chain in the intermediate
node.


> I would like to get your confirmation or explanation on this point. Thank
> you very much.
>
> I got the following exception when collector1 was configured with
> agentE2ESink:
>
> 2011-06-28 21:12:54,004 [logicalNode collector1-47] ERROR
> connector.DirectDriver: Driving src/sink failed! LazyOpenSource |
> LazyOpenDecorator because Event already had an event with attribute AckType
> java.lang.IllegalArgumentException: Event already had an event with
> attribute AckType
>        at
> com.cloudera.flume.handlers.thrift.ThriftEventAdaptor.set(ThriftEventAdaptor.java:184)
>        at
> com.cloudera.flume.handlers.endtoend.AckChecksumInjector.append(AckChecksumInjector.java:139)
>        at
> com.cloudera.flume.agent.durability.NaiveFileWALManager$1.append(NaiveFileWALManager.java:457)
>        at
> com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:174)
>        at
> com.cloudera.flume.agent.durability.NaiveFileWALDeco.append(NaiveFileWALDeco.java:138)
>        at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:112)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:75)
>        at
> com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)
>
> Yup, this confirmed the attempt to overwrite metadata (in this case it is
the AckType attribute)


>
> Regards,
> Kun
>



-- 
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// jon@cloudera.com