You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Jagadish Bihani <ja...@pubmatic.com> on 2013/04/16 08:36:12 UTC

"single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Hi

We have a use case in which
1. spooling source reads data.
2. It needs to write events into multiple channels. It should apply
interceptor only when putting into one channel and should put
the event as it is while putting into another channel.

Possible approach we have thought:

1. Create  2 different sources and then apply interceptor on one and dont
apply on other. But that duplicates reads and increases IO.

Is there any better way of achieving this use case?

Regards,
Jagadish


Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Jagadish Bihani <ja...@pubmatic.com>.
Hi

Thanks Jeff and Connor. That was helpful.

@Connor :

Scenario:
=======
  We have  a scenario where we receive logs where each record contains 
several
fields (around 100). We want to send every field of every record as it 
is to hdfs and send only
"subset" of fields of "every"record to another sink (say Storm).
    So in this we don't filter records, we are filtering some fields of 
every record in
  one channel and none in other.

Hence, apparently the solution of channel selector will not work here 
and custom event serializer might need to be written.
As mentioned by you if there are any directions related to that are 
available it will be of great help.

Regards,
Jagadish


On 04/23/2013 12:22 PM, Connor Woodson wrote:
> Some more thoughts on this:
>
> The way Interceptors are currently set to work is that they apply to 
> an event as it is received. There are good uses for this - for 
> instances, it allows easily configuring a single Timestamp interceptor 
> that gives all events a source receives a timestamp, so even if you 
> have multiple sinks/channels responding to an event, you only have 
> that one interceptor. Interceptors in this sense serve to add data to 
> event headers, and as such it makes sense to have them applied only 
> once by the source instead of letting the channels change header data.
>
> If you wish to use an interceptor in the above way, to modify header 
> data, and still want that interceptor to apply for a single channel, 
> then if you don't mind could you elaborate on what you are trying to 
> do? I haven't been able to come up with a situation like that. The 
> solution here would be to do as Jeff suggested and use a serializer; 
> if you want more in-depth instructions on how to build it, please ask; 
> I have a set of directions lying around somewhere that I'll find for you.
>
>
> However, the way Interceptors work I have myself faced a situation 
> where I would like the interceptors to be channel only. This use case 
> is when I want to use an Interceptor to filter events; I want to send 
> an event to some subset of channels based on the contents of its data. 
> Here is how you can do this in the current setup (where Interceptors 
> are applied at the source instead of per-channel):
>
> Using the Multiplexing Channel Selector you are able to choose which 
> channels an event is written to based off of the value of a specified 
> header (documentation in that link). There are some more features to 
> the selector that aren't documented, called Optional Channels or 
> something, but I don't know very much about them - just figured I 
> would point out that they exist; digging through the source should 
> provide some more insight.
>
> So here is how you want to set your system up. Create an Interceptor 
> that will define a certain header value based off of the event's 
> contents. For instance, if you want all events containing exactly 1 
> character to be sent to a channel, you could create an Interceptor 
> that counts the characters in the event. Then that Interceptor will 
> set a certain header value to "SINGLE" if there is just one character, 
> or "MULTIPLE" if there are more.
>
> Then you can create your channel selector like this (modified from the 
> documentation example):
>
> a1.sources = r1
> a1.channels = all_events single_events multiple_events
> a1.sources.r1.interceptors = your_interceptor
> a1.sources.r1.interceptors.your_interceptor.header = header
> a1.sources.r1.selector.type = multiplexing
> a1.sources.r1.selector.header = header
> a1.sources.r1.selector.mapping.SINGLE = all_events single_events
> a1.sources.r1.selector.mapping.MULTIPLE = all_events multiple_events
> a1.sources.r1.selector.default = all_events
>
>
> The result is that now you have created a way to filter which channels 
> a certain event is sent to. Note that a channel can appear more than 
> once - for instance, all_events will get all events. And so the trick 
> is to just define the right interceptor (which are much simpler to 
> code than a serializer (which itself is fairly easy)).
>
> Hopefully that was clear. Feel free to ask more questions,
>
> - Connor
>
>
>
> On Fri, Apr 19, 2013 at 11:14 AM, Jeff Lord <jlord@cloudera.com 
> <ma...@cloudera.com>> wrote:
>
>     Jagadish,
>
>     Here is an example of how to write a custom serializer.
>
>     https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
>
>     -Jeff
>
>
>     On Fri, Apr 19, 2013 at 9:34 AM, Jeff Lord <jlord@cloudera.com
>     <ma...@cloudera.com>> wrote:
>
>         Hi Jagadish,
>
>         Have you considered using a custom event serializer to modify
>         your event?
>         Its possible to replicate your flow using two channels and
>         then have one sink that implements a custom serializer to
>         modify the event.
>
>         -Jeff
>
>
>         On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani
>         <jagadish.bihani@pubmatic.com
>         <ma...@pubmatic.com>> wrote:
>
>             Hi
>
>             If anybody has any inputs on this that will surely help.
>
>             Regards,
>             Jagadish
>
>
>             On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>
>                 Hi
>
>                 We have a use case in which
>                 1. spooling source reads data.
>                 2. It needs to write events into multiple channels. It
>                 should apply
>                 interceptor only when putting into one channel and
>                 should put
>                 the event as it is while putting into another channel.
>
>                 Possible approach we have thought:
>
>                 1. Create  2 different sources and then apply
>                 interceptor on one and dont
>                 apply on other. But that duplicates reads and
>                 increases IO.
>
>                 Is there any better way of achieving this use case?
>
>                 Regards,
>                 Jagadish
>
>
>
>
>


Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Connor Woodson <cw...@gmail.com>.
Jagadish,

You are right. Your problem here seems to be more about treating your
events differently depending on the sink, and that is what I believe
Serializers are best at. Here are some directions/advice for creating a
serializer (if you look in the lists for the 'custome serializer' thread
you will find another set of directions that may or may not be additionally
useful):

1. I find the place to start is generally with pre-existing code.
BodyTextEventSerializer (this is the default serializer for the HDFS sink /
file sink if none is defined) and HeaderAndBodyTextEventSerializer (at this
link<https://github.com/apache/flume/tree/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization>)
are the two basic serializers, and are the best places to start (they do
almost the exact same thing, so you really only need to look at one of
them); out of all of the files in that link, the only other serializer is
FlumeEventAvroEventSerializer (these names are all mouthfuls...). One thing
of note is that none of these example serializers implement the configure
method - look at the AbstractAvroEventSerializer to see that method
implemented.

2. Things you need to be sure to change when you copy one of those files
are obviously the class name and probably package name, the constructor,
and then the builder class at the bottom; this builder class is what is
used to create and configure the serializer (generally you create the
serializer "EventSerializer s = new MyEventSerializer(out);" and then
configure it "s.configure(context);", or at least that's how I do it - it
appears that the BodyTextEventSerializer configures itself in its
constructor; either way is valid I suppose. The thing of note however is
that this Builder.build(...) is what is called to create an instance of
your serializer.

3. The main method is write(Event e): this method is given an event, and
you are expected to write the contents of that event in some way to the
output stream that the serializer was created with. After write(...) is
called, or maybe a few write's in a row, flush() will be called - I've
never done something with this function.

4. Some details on the other functions: supportsReopen() should return
'true' unless there is a reason for it to return 'false'. I believe this
function is only used in the HDFS Sink writers where it is checked to make
sure a serializer is able to append to an existing stream
(here<https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java#L98>and
here<https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java#L85>is
the code relating to this). afterCreate is called when a serializer is
created - afterReopen will be called instead if the serializer is appending
(the previous links use this; I don't think afterReopen is called anywhere
else). beforeClose is for just before the stream is closed, and after a
stream is closed the serializer should be removed (/ null'd / set the
serializer variable to null).

5. For my serializers, as I mentioned I implement the configure method
instead of using the constructor in the way the Body one does. I don't do
much of anything in the other functions, other than in 'write' which is
where the meat of the code goes. It appears the way to go about it would be
in your write method, using a RegEx or something else you want to pull
apart your event into its various fields, and then you will write a subset
of those fields to the output stream in one way or another.

I believe that's about it; if anything's unclear, I'll be more than happy
to fix it up.

An important note which I tried to cover above about the builder is that
when you supply your custom FQCN of your serializer for the
"agent.sinks.<sink>.serializer.type" property, you supply the FQCN of the
Builder inner-class, so it looks like this: com.connor.MySerializer$Builder

And here<https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst#installing-third-party-plugins>is
some documentation on the best way to include your custom serializer
in
Flume.

- Connor


On Tue, Apr 23, 2013 at 7:15 AM, Israel Ekpo <is...@aicer.org> wrote:

> Connor,
>
> This is a great example.
>
> Thank you for sharing this. It was an excellent tutorial.
>
> I will create a JIRA issue to document this workaround in the user guide.
>
>
>
>
> On 23 April 2013 02:52, Connor Woodson <cw...@gmail.com> wrote:
>
>> Some more thoughts on this:
>>
>> The way Interceptors are currently set to work is that they apply to an
>> event as it is received. There are good uses for this - for instances, it
>> allows easily configuring a single Timestamp interceptor that gives all
>> events a source receives a timestamp, so even if you have multiple
>> sinks/channels responding to an event, you only have that one interceptor.
>> Interceptors in this sense serve to add data to event headers, and as such
>> it makes sense to have them applied only once by the source instead of
>> letting the channels change header data.
>>
>> If you wish to use an interceptor in the above way, to modify header
>> data, and still want that interceptor to apply for a single channel, then
>> if you don't mind could you elaborate on what you are trying to do? I
>> haven't been able to come up with a situation like that. The solution here
>> would be to do as Jeff suggested and use a serializer; if you want more
>> in-depth instructions on how to build it, please ask; I have a set of
>> directions lying around somewhere that I'll find for you.
>>
>>
>> However, the way Interceptors work I have myself faced a situation where
>> I would like the interceptors to be channel only. This use case is when I
>> want to use an Interceptor to filter events; I want to send an event to
>> some subset of channels based on the contents of its data. Here is how you
>> can do this in the current setup (where Interceptors are applied at the
>> source instead of per-channel):
>>
>> Using the Multiplexing Channel Selector you are able to choose which
>> channels an event is written to based off of the value of a specified
>> header (documentation in that link). There are some more features to the
>> selector that aren't documented, called Optional Channels or something, but
>> I don't know very much about them - just figured I would point out that
>> they exist; digging through the source should provide some more insight.
>>
>> So here is how you want to set your system up. Create an Interceptor that
>> will define a certain header value based off of the event's contents. For
>> instance, if you want all events containing exactly 1 character to be sent
>> to a channel, you could create an Interceptor that counts the characters in
>> the event. Then that Interceptor will set a certain header value to
>> "SINGLE" if there is just one character, or "MULTIPLE" if there are more.
>>
>> Then you can create your channel selector like this (modified from the
>> documentation example):
>>
>> a1.sources = r1
>> a1.channels = all_events single_events multiple_events
>> a1.sources.r1.interceptors = your_interceptor
>> a1.sources.r1.interceptors.your_interceptor.header = header
>> a1.sources.r1.selector.type = multiplexing
>> a1.sources.r1.selector.header = header
>> a1.sources.r1.selector.mapping.SINGLE = all_events single_events
>> a1.sources.r1.selector.mapping.MULTIPLE = all_events multiple_events
>> a1.sources.r1.selector.default = all_events
>>
>>
>> The result is that now you have created a way to filter which channels a
>> certain event is sent to. Note that a channel can appear more than once -
>> for instance, all_events will get all events. And so the trick is to just
>> define the right interceptor (which are much simpler to code than a
>> serializer (which itself is fairly easy)).
>>
>> Hopefully that was clear. Feel free to ask more questions,
>>
>> - Connor
>>
>>
>>
>> On Fri, Apr 19, 2013 at 11:14 AM, Jeff Lord <jl...@cloudera.com> wrote:
>>
>>> Jagadish,
>>>
>>> Here is an example of how to write a custom serializer.
>>>
>>>
>>> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
>>>
>>> -Jeff
>>>
>>>
>>> On Fri, Apr 19, 2013 at 9:34 AM, Jeff Lord <jl...@cloudera.com> wrote:
>>>
>>>> Hi Jagadish,
>>>>
>>>> Have you considered using a custom event serializer to modify your
>>>> event?
>>>> Its possible to replicate your flow using two channels and then have
>>>> one sink that implements a custom serializer to modify the event.
>>>>
>>>> -Jeff
>>>>
>>>>
>>>> On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani <
>>>> jagadish.bihani@pubmatic.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> If anybody has any inputs on this that will surely help.
>>>>>
>>>>> Regards,
>>>>> Jagadish
>>>>>
>>>>>
>>>>> On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> We have a use case in which
>>>>>> 1. spooling source reads data.
>>>>>> 2. It needs to write events into multiple channels. It should apply
>>>>>> interceptor only when putting into one channel and should put
>>>>>> the event as it is while putting into another channel.
>>>>>>
>>>>>> Possible approach we have thought:
>>>>>>
>>>>>> 1. Create  2 different sources and then apply interceptor on one and
>>>>>> dont
>>>>>> apply on other. But that duplicates reads and increases IO.
>>>>>>
>>>>>> Is there any better way of achieving this use case?
>>>>>>
>>>>>> Regards,
>>>>>> Jagadish
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Israel Ekpo <is...@aicer.org>.
Connor,

This is a great example.

Thank you for sharing this. It was an excellent tutorial.

I will create a JIRA issue to document this workaround in the user guide.




On 23 April 2013 02:52, Connor Woodson <cw...@gmail.com> wrote:

> Some more thoughts on this:
>
> The way Interceptors are currently set to work is that they apply to an
> event as it is received. There are good uses for this - for instances, it
> allows easily configuring a single Timestamp interceptor that gives all
> events a source receives a timestamp, so even if you have multiple
> sinks/channels responding to an event, you only have that one interceptor.
> Interceptors in this sense serve to add data to event headers, and as such
> it makes sense to have them applied only once by the source instead of
> letting the channels change header data.
>
> If you wish to use an interceptor in the above way, to modify header data,
> and still want that interceptor to apply for a single channel, then if you
> don't mind could you elaborate on what you are trying to do? I haven't been
> able to come up with a situation like that. The solution here would be to
> do as Jeff suggested and use a serializer; if you want more in-depth
> instructions on how to build it, please ask; I have a set of directions
> lying around somewhere that I'll find for you.
>
>
> However, the way Interceptors work I have myself faced a situation where I
> would like the interceptors to be channel only. This use case is when I
> want to use an Interceptor to filter events; I want to send an event to
> some subset of channels based on the contents of its data. Here is how you
> can do this in the current setup (where Interceptors are applied at the
> source instead of per-channel):
>
> Using the Multiplexing Channel Selector you are able to choose which
> channels an event is written to based off of the value of a specified
> header (documentation in that link). There are some more features to the
> selector that aren't documented, called Optional Channels or something, but
> I don't know very much about them - just figured I would point out that
> they exist; digging through the source should provide some more insight.
>
> So here is how you want to set your system up. Create an Interceptor that
> will define a certain header value based off of the event's contents. For
> instance, if you want all events containing exactly 1 character to be sent
> to a channel, you could create an Interceptor that counts the characters in
> the event. Then that Interceptor will set a certain header value to
> "SINGLE" if there is just one character, or "MULTIPLE" if there are more.
>
> Then you can create your channel selector like this (modified from the
> documentation example):
>
> a1.sources = r1
> a1.channels = all_events single_events multiple_events
> a1.sources.r1.interceptors = your_interceptor
> a1.sources.r1.interceptors.your_interceptor.header = header
> a1.sources.r1.selector.type = multiplexing
> a1.sources.r1.selector.header = header
> a1.sources.r1.selector.mapping.SINGLE = all_events single_events
> a1.sources.r1.selector.mapping.MULTIPLE = all_events multiple_events
> a1.sources.r1.selector.default = all_events
>
>
> The result is that now you have created a way to filter which channels a
> certain event is sent to. Note that a channel can appear more than once -
> for instance, all_events will get all events. And so the trick is to just
> define the right interceptor (which are much simpler to code than a
> serializer (which itself is fairly easy)).
>
> Hopefully that was clear. Feel free to ask more questions,
>
> - Connor
>
>
>
> On Fri, Apr 19, 2013 at 11:14 AM, Jeff Lord <jl...@cloudera.com> wrote:
>
>> Jagadish,
>>
>> Here is an example of how to write a custom serializer.
>>
>>
>> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
>>
>> -Jeff
>>
>>
>> On Fri, Apr 19, 2013 at 9:34 AM, Jeff Lord <jl...@cloudera.com> wrote:
>>
>>> Hi Jagadish,
>>>
>>> Have you considered using a custom event serializer to modify your event?
>>> Its possible to replicate your flow using two channels and then have one
>>> sink that implements a custom serializer to modify the event.
>>>
>>> -Jeff
>>>
>>>
>>> On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani <
>>> jagadish.bihani@pubmatic.com> wrote:
>>>
>>>> Hi
>>>>
>>>> If anybody has any inputs on this that will surely help.
>>>>
>>>> Regards,
>>>> Jagadish
>>>>
>>>>
>>>> On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> We have a use case in which
>>>>> 1. spooling source reads data.
>>>>> 2. It needs to write events into multiple channels. It should apply
>>>>> interceptor only when putting into one channel and should put
>>>>> the event as it is while putting into another channel.
>>>>>
>>>>> Possible approach we have thought:
>>>>>
>>>>> 1. Create  2 different sources and then apply interceptor on one and
>>>>> dont
>>>>> apply on other. But that duplicates reads and increases IO.
>>>>>
>>>>> Is there any better way of achieving this use case?
>>>>>
>>>>> Regards,
>>>>> Jagadish
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Connor Woodson <cw...@gmail.com>.
Heh, I forgot to link the multiplexing channel selector documentation.

Here it is.<http://flume.apache.org/FlumeUserGuide.html#multiplexing-channel-selector>

- Connor


On Mon, Apr 22, 2013 at 11:52 PM, Connor Woodson <cw...@gmail.com>wrote:

> Some more thoughts on this:
>
> The way Interceptors are currently set to work is that they apply to an
> event as it is received. There are good uses for this - for instances, it
> allows easily configuring a single Timestamp interceptor that gives all
> events a source receives a timestamp, so even if you have multiple
> sinks/channels responding to an event, you only have that one interceptor.
> Interceptors in this sense serve to add data to event headers, and as such
> it makes sense to have them applied only once by the source instead of
> letting the channels change header data.
>
> If you wish to use an interceptor in the above way, to modify header data,
> and still want that interceptor to apply for a single channel, then if you
> don't mind could you elaborate on what you are trying to do? I haven't been
> able to come up with a situation like that. The solution here would be to
> do as Jeff suggested and use a serializer; if you want more in-depth
> instructions on how to build it, please ask; I have a set of directions
> lying around somewhere that I'll find for you.
>
>
> However, the way Interceptors work I have myself faced a situation where I
> would like the interceptors to be channel only. This use case is when I
> want to use an Interceptor to filter events; I want to send an event to
> some subset of channels based on the contents of its data. Here is how you
> can do this in the current setup (where Interceptors are applied at the
> source instead of per-channel):
>
> Using the Multiplexing Channel Selector you are able to choose which
> channels an event is written to based off of the value of a specified
> header (documentation in that link). There are some more features to the
> selector that aren't documented, called Optional Channels or something, but
> I don't know very much about them - just figured I would point out that
> they exist; digging through the source should provide some more insight.
>
> So here is how you want to set your system up. Create an Interceptor that
> will define a certain header value based off of the event's contents. For
> instance, if you want all events containing exactly 1 character to be sent
> to a channel, you could create an Interceptor that counts the characters in
> the event. Then that Interceptor will set a certain header value to
> "SINGLE" if there is just one character, or "MULTIPLE" if there are more.
>
> Then you can create your channel selector like this (modified from the
> documentation example):
>
> a1.sources = r1
> a1.channels = all_events single_events multiple_events
> a1.sources.r1.interceptors = your_interceptor
> a1.sources.r1.interceptors.your_interceptor.header = header
> a1.sources.r1.selector.type = multiplexing
> a1.sources.r1.selector.header = header
> a1.sources.r1.selector.mapping.SINGLE = all_events single_events
> a1.sources.r1.selector.mapping.MULTIPLE = all_events multiple_events
> a1.sources.r1.selector.default = all_events
>
>
> The result is that now you have created a way to filter which channels a
> certain event is sent to. Note that a channel can appear more than once -
> for instance, all_events will get all events. And so the trick is to just
> define the right interceptor (which are much simpler to code than a
> serializer (which itself is fairly easy)).
>
> Hopefully that was clear. Feel free to ask more questions,
>
> - Connor
>
>
>
> On Fri, Apr 19, 2013 at 11:14 AM, Jeff Lord <jl...@cloudera.com> wrote:
>
>> Jagadish,
>>
>> Here is an example of how to write a custom serializer.
>>
>>
>> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
>>
>> -Jeff
>>
>>
>> On Fri, Apr 19, 2013 at 9:34 AM, Jeff Lord <jl...@cloudera.com> wrote:
>>
>>> Hi Jagadish,
>>>
>>> Have you considered using a custom event serializer to modify your event?
>>> Its possible to replicate your flow using two channels and then have one
>>> sink that implements a custom serializer to modify the event.
>>>
>>> -Jeff
>>>
>>>
>>> On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani <
>>> jagadish.bihani@pubmatic.com> wrote:
>>>
>>>> Hi
>>>>
>>>> If anybody has any inputs on this that will surely help.
>>>>
>>>> Regards,
>>>> Jagadish
>>>>
>>>>
>>>> On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> We have a use case in which
>>>>> 1. spooling source reads data.
>>>>> 2. It needs to write events into multiple channels. It should apply
>>>>> interceptor only when putting into one channel and should put
>>>>> the event as it is while putting into another channel.
>>>>>
>>>>> Possible approach we have thought:
>>>>>
>>>>> 1. Create  2 different sources and then apply interceptor on one and
>>>>> dont
>>>>> apply on other. But that duplicates reads and increases IO.
>>>>>
>>>>> Is there any better way of achieving this use case?
>>>>>
>>>>> Regards,
>>>>> Jagadish
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Connor Woodson <cw...@gmail.com>.
Some more thoughts on this:

The way Interceptors are currently set to work is that they apply to an
event as it is received. There are good uses for this - for instances, it
allows easily configuring a single Timestamp interceptor that gives all
events a source receives a timestamp, so even if you have multiple
sinks/channels responding to an event, you only have that one interceptor.
Interceptors in this sense serve to add data to event headers, and as such
it makes sense to have them applied only once by the source instead of
letting the channels change header data.

If you wish to use an interceptor in the above way, to modify header data,
and still want that interceptor to apply for a single channel, then if you
don't mind could you elaborate on what you are trying to do? I haven't been
able to come up with a situation like that. The solution here would be to
do as Jeff suggested and use a serializer; if you want more in-depth
instructions on how to build it, please ask; I have a set of directions
lying around somewhere that I'll find for you.


However, the way Interceptors work I have myself faced a situation where I
would like the interceptors to be channel only. This use case is when I
want to use an Interceptor to filter events; I want to send an event to
some subset of channels based on the contents of its data. Here is how you
can do this in the current setup (where Interceptors are applied at the
source instead of per-channel):

Using the Multiplexing Channel Selector you are able to choose which
channels an event is written to based off of the value of a specified
header (documentation in that link). There are some more features to the
selector that aren't documented, called Optional Channels or something, but
I don't know very much about them - just figured I would point out that
they exist; digging through the source should provide some more insight.

So here is how you want to set your system up. Create an Interceptor that
will define a certain header value based off of the event's contents. For
instance, if you want all events containing exactly 1 character to be sent
to a channel, you could create an Interceptor that counts the characters in
the event. Then that Interceptor will set a certain header value to
"SINGLE" if there is just one character, or "MULTIPLE" if there are more.

Then you can create your channel selector like this (modified from the
documentation example):

a1.sources = r1
a1.channels = all_events single_events multiple_events
a1.sources.r1.interceptors = your_interceptor
a1.sources.r1.interceptors.your_interceptor.header = header
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = header
a1.sources.r1.selector.mapping.SINGLE = all_events single_events
a1.sources.r1.selector.mapping.MULTIPLE = all_events multiple_events
a1.sources.r1.selector.default = all_events


The result is that now you have created a way to filter which channels a
certain event is sent to. Note that a channel can appear more than once -
for instance, all_events will get all events. And so the trick is to just
define the right interceptor (which are much simpler to code than a
serializer (which itself is fairly easy)).

Hopefully that was clear. Feel free to ask more questions,

- Connor



On Fri, Apr 19, 2013 at 11:14 AM, Jeff Lord <jl...@cloudera.com> wrote:

> Jagadish,
>
> Here is an example of how to write a custom serializer.
>
>
> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
>
> -Jeff
>
>
> On Fri, Apr 19, 2013 at 9:34 AM, Jeff Lord <jl...@cloudera.com> wrote:
>
>> Hi Jagadish,
>>
>> Have you considered using a custom event serializer to modify your event?
>> Its possible to replicate your flow using two channels and then have one
>> sink that implements a custom serializer to modify the event.
>>
>> -Jeff
>>
>>
>> On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani <
>> jagadish.bihani@pubmatic.com> wrote:
>>
>>> Hi
>>>
>>> If anybody has any inputs on this that will surely help.
>>>
>>> Regards,
>>> Jagadish
>>>
>>>
>>> On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>>>
>>>> Hi
>>>>
>>>> We have a use case in which
>>>> 1. spooling source reads data.
>>>> 2. It needs to write events into multiple channels. It should apply
>>>> interceptor only when putting into one channel and should put
>>>> the event as it is while putting into another channel.
>>>>
>>>> Possible approach we have thought:
>>>>
>>>> 1. Create  2 different sources and then apply interceptor on one and
>>>> dont
>>>> apply on other. But that duplicates reads and increases IO.
>>>>
>>>> Is there any better way of achieving this use case?
>>>>
>>>> Regards,
>>>> Jagadish
>>>>
>>>>
>>>
>>
>

Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Jeff Lord <jl...@cloudera.com>.
Jagadish,

Here is an example of how to write a custom serializer.

https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java

-Jeff


On Fri, Apr 19, 2013 at 9:34 AM, Jeff Lord <jl...@cloudera.com> wrote:

> Hi Jagadish,
>
> Have you considered using a custom event serializer to modify your event?
> Its possible to replicate your flow using two channels and then have one
> sink that implements a custom serializer to modify the event.
>
> -Jeff
>
>
> On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani <
> jagadish.bihani@pubmatic.com> wrote:
>
>> Hi
>>
>> If anybody has any inputs on this that will surely help.
>>
>> Regards,
>> Jagadish
>>
>>
>> On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>>
>>> Hi
>>>
>>> We have a use case in which
>>> 1. spooling source reads data.
>>> 2. It needs to write events into multiple channels. It should apply
>>> interceptor only when putting into one channel and should put
>>> the event as it is while putting into another channel.
>>>
>>> Possible approach we have thought:
>>>
>>> 1. Create  2 different sources and then apply interceptor on one and dont
>>> apply on other. But that duplicates reads and increases IO.
>>>
>>> Is there any better way of achieving this use case?
>>>
>>> Regards,
>>> Jagadish
>>>
>>>
>>
>

Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Jeff Lord <jl...@cloudera.com>.
Hi Jagadish,

Have you considered using a custom event serializer to modify your event?
Its possible to replicate your flow using two channels and then have one
sink that implements a custom serializer to modify the event.

-Jeff


On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani <
jagadish.bihani@pubmatic.com> wrote:

> Hi
>
> If anybody has any inputs on this that will surely help.
>
> Regards,
> Jagadish
>
>
> On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>
>> Hi
>>
>> We have a use case in which
>> 1. spooling source reads data.
>> 2. It needs to write events into multiple channels. It should apply
>> interceptor only when putting into one channel and should put
>> the event as it is while putting into another channel.
>>
>> Possible approach we have thought:
>>
>> 1. Create  2 different sources and then apply interceptor on one and dont
>> apply on other. But that duplicates reads and increases IO.
>>
>> Is there any better way of achieving this use case?
>>
>> Regards,
>> Jagadish
>>
>>
>

Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches

Posted by Jagadish Bihani <ja...@pubmatic.com>.
Hi

If anybody has any inputs on this that will surely help.

Regards,
Jagadish

On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
> Hi
>
> We have a use case in which
> 1. spooling source reads data.
> 2. It needs to write events into multiple channels. It should apply
> interceptor only when putting into one channel and should put
> the event as it is while putting into another channel.
>
> Possible approach we have thought:
>
> 1. Create  2 different sources and then apply interceptor on one and dont
> apply on other. But that duplicates reads and increases IO.
>
> Is there any better way of achieving this use case?
>
> Regards,
> Jagadish
>