You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Alexandru Sicoe <ad...@gmail.com> on 2013/10/02 13:45:30 UTC

Splitting the event flow in Flume

Hello everyone,

My setup is the following: I am pulling xml messages from RabbitMQ via a
RabbitMQ Flume Source. Attached to this Source is an Interceptor which
parses the xml into csv and eventually the csv message is dumped to a csv
file by a the Sync. Both the Interceptor and Sync are my own custom
implementations.

A simple sketch of this would be:
rabbitmq -> source -> interceptor -> channel -> sync -> file.csv

This works fine!

Now I need to figure how to also dump the raw xml content to an xml file as
well as the parsed csv content.

I have devised several methods to achieve this. I would like some advice on
whether the methods are possible and which one is the best?

1. Pulling the xml twice from RabbitMQ

rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
              -> source2 -> channel -> sync -> file.xml

2. Pulling the xml once but generate both a csv and an xml from the
Interceptor.

rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
                                                   -> channel -> sync ->
file.xml

3. Pulling the xml once and having a fan out source and an Interceptor
before the sync.

rabbitmq -> source1 -> channel -> interceptor -> sync -> file.csv
                              -> channel -> sync -> file.xml

In my opinion option 3 would be the best since it doesn't require pulling
the xml twice from RabbitMQ and doesn't require any change in the code that
I wrote. The problem is that I'm not sure it is possible. I tried the
following config file without success:

agent1.sources = rabbitmq-source1
agent1.channels = memch1
agent1.sinks = Console

agent1.sources.rabbitmq-source1.channels = memch1
agent1.sources.rabbitmq-source1.type =
org.apache.flume.source.rabbitmq.RabbitMQSource
agent1.sources.rabbitmq-source1.hostname = localhost
agent1.sources.rabbitmq-source1.queuename = hello

agent1.sinks.Console.interceptors = interceptor1
agent1.sinks.Console.interceptors.interceptor1.type =
resilient.flume.MyInterceptor$Builder
agent1.sinks.Console.channel = memch1
agent1.sinks.Console.type = logger

agent1.channels.memch1.type = memory

Am I doing something wrong? or is option 3 not possible at all?

Thanks,
Alex

RE: Splitting the event flow in Flume

Posted by Paul Chavez <pc...@verticalsearchworks.com>.
Option 3 is possible with two agents.

Agent1:

Rabbitmq -> source1 -> replicating channel selector ->
-> channel1 -> avro sink1 -> agent2.avrosource1
-> channel2 -> avro sink2 -> agent2.avrosource2

Agent2:
#CSV path with your interceptor
Avrosource1 -> interceptor -> channel -> sink to CSV
#XML path with no interceptor
Avrosource2 -> channel -> sink to XML

Hope that helps,
Paul



From: Alexandru Sicoe [mailto:adsicoe@gmail.com]
Sent: Wednesday, October 02, 2013 4:46 AM
To: user@flume.apache.org
Subject: Splitting the event flow in Flume

Hello everyone,
My setup is the following: I am pulling xml messages from RabbitMQ via a RabbitMQ Flume Source. Attached to this Source is an Interceptor which parses the xml into csv and eventually the csv message is dumped to a csv file by a the Sync. Both the Interceptor and Sync are my own custom implementations.

A simple sketch of this would be:
rabbitmq -> source -> interceptor -> channel -> sync -> file.csv

This works fine!
Now I need to figure how to also dump the raw xml content to an xml file as well as the parsed csv content.
I have devised several methods to achieve this. I would like some advice on whether the methods are possible and which one is the best?
1. Pulling the xml twice from RabbitMQ

rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
              -> source2 -> channel -> sync -> file.xml

2. Pulling the xml once but generate both a csv and an xml from the Interceptor.

rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
                                                   -> channel -> sync -> file.xml

3. Pulling the xml once and having a fan out source and an Interceptor before the sync.

rabbitmq -> source1 -> channel -> interceptor -> sync -> file.csv
                              -> channel -> sync -> file.xml
In my opinion option 3 would be the best since it doesn't require pulling the xml twice from RabbitMQ and doesn't require any change in the code that I wrote. The problem is that I'm not sure it is possible. I tried the following config file without success:

agent1.sources = rabbitmq-source1
agent1.channels = memch1
agent1.sinks = Console

agent1.sources.rabbitmq-source1.channels = memch1
agent1.sources.rabbitmq-source1.type = org.apache.flume.source.rabbitmq.RabbitMQSource
agent1.sources.rabbitmq-source1.hostname = localhost
agent1.sources.rabbitmq-source1.queuename = hello

agent1.sinks.Console.interceptors = interceptor1
agent1.sinks.Console.interceptors.interceptor1.type = resilient.flume.MyInterceptor$Builder
agent1.sinks.Console.channel = memch1
agent1.sinks.Console.type = logger

agent1.channels.memch1.type = memory

Am I doing something wrong? or is option 3 not possible at all?
Thanks,
Alex

Re: Splitting the event flow in Flume

Posted by Arvind Prabhakar <ar...@apache.org>.
Hi Alex,

Unfortunately interceptors can only be applied to the sources. At this
time, we do not have support for sink-side interceptors. There is
FLUME-1207 that tracks this request.

https://issues.apache.org/jira/browse/FLUME-1207

Regards,
Arvind Prabhakar



On Wed, Oct 2, 2013 at 4:45 AM, Alexandru Sicoe <ad...@gmail.com> wrote:

> Hello everyone,
>
> My setup is the following: I am pulling xml messages from RabbitMQ via a
> RabbitMQ Flume Source. Attached to this Source is an Interceptor which
> parses the xml into csv and eventually the csv message is dumped to a csv
> file by a the Sync. Both the Interceptor and Sync are my own custom
> implementations.
>
> A simple sketch of this would be:
> rabbitmq -> source -> interceptor -> channel -> sync -> file.csv
>
> This works fine!
>
> Now I need to figure how to also dump the raw xml content to an xml file
> as well as the parsed csv content.
>
> I have devised several methods to achieve this. I would like some advice
> on whether the methods are possible and which one is the best?
>
> 1. Pulling the xml twice from RabbitMQ
>
> rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
>               -> source2 -> channel -> sync -> file.xml
>
> 2. Pulling the xml once but generate both a csv and an xml from the
> Interceptor.
>
> rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
>                                                    -> channel -> sync ->
> file.xml
>
> 3. Pulling the xml once and having a fan out source and an Interceptor
> before the sync.
>
> rabbitmq -> source1 -> channel -> interceptor -> sync -> file.csv
>                               -> channel -> sync -> file.xml
>
> In my opinion option 3 would be the best since it doesn't require pulling
> the xml twice from RabbitMQ and doesn't require any change in the code that
> I wrote. The problem is that I'm not sure it is possible. I tried the
> following config file without success:
>
> agent1.sources = rabbitmq-source1
> agent1.channels = memch1
> agent1.sinks = Console
>
> agent1.sources.rabbitmq-source1.channels = memch1
> agent1.sources.rabbitmq-source1.type =
> org.apache.flume.source.rabbitmq.RabbitMQSource
> agent1.sources.rabbitmq-source1.hostname = localhost
> agent1.sources.rabbitmq-source1.queuename = hello
>
> agent1.sinks.Console.interceptors = interceptor1
> agent1.sinks.Console.interceptors.interceptor1.type =
> resilient.flume.MyInterceptor$Builder
> agent1.sinks.Console.channel = memch1
> agent1.sinks.Console.type = logger
>
> agent1.channels.memch1.type = memory
>
> Am I doing something wrong? or is option 3 not possible at all?
>
>  Thanks,
> Alex
>