You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Zhishan Li <zh...@gmail.com> on 2015/11/12 13:34:16 UTC

flume interceptor issue

I have a question: how to set two interceptors to a channel?

Here is my configuration:

# configuration start
tier1.sources.KafkaSource.channels = c1 c2
tier1.sources.KafkaSource.interceptors = i1 i2
tier1.sources.KafkaSource.interceptors.i1.type = host
tier1.sources.KafkaSource.interceptors.i1.useIP = true

tier1.sources.KafkaSource.interceptors.i2.type = org.apache.flume.interceptor.My$Builder
tier1.sources.KafkaSource.interceptors.i2.key = logType
tier1.sources.KafkaSource.interceptors.i2.value = app

tier1.sources.KafkaSource.selector.type = multiplexing
tier1.sources.KafkaSource.selector.header = logType
tier1.sources.KafkaSource.selector.mapping.app = c1
tier1.sources.KafkaSource.selector.default = c2



The Json format source: KafkSource will be processed by the interceptor: My$Builder, which will divide the source data to two part - the one with logType=app and another’s logType != app.

for example:

{“logType”:”app”,”id”:1,"env”:”test"}
{“logType”:”phone”,”id”:2,}
{“logType”:”phone”,”id”:3,”env”:”test"}

the “id”=1 will be trans to channel c1 and the id=2 and id=3 will be sent to channel c2.

So my question is: what a new interceptor can be set to make the record with logType=app and env=test be sent to channel c2?

Here is my thought, append the below interceptor:

tier1.sources.KafkaSource.interceptors.i3.type = org.apache.flume.interceptor.Myr$Builder
tier1.sources.KafkaSource.interceptors.i3.key = env
tier1.sources.KafkaSource.interceptors.i3.value = test

But it still make the record with id=2 be sent to channel c2 too.

Please help me figure out.

I expect the interceptor i3 only consume a part of i2’s output with logType=app.

Thanks 

I 

Re: flume interceptor issue

Posted by Ahmed Vila <av...@devlogic.eu>.
Hi Zhishan,

It's not the interceptor that directs an event toward the channel, but the
channel selector configuration for the source.
Take a look at this topic:
https://flume.apache.org/FlumeUserGuide.html#fan-out-flow

Basically, when you encounter logType=app in your JSON you should emit a
header key with value, f.e. "logType" = "app".

Your configuration is ok and the only thing that's missing is that header
emitted by your interceptor. Here with your selector you're observing
header logType and you're mapping header's value "app" to c1 channel.
*tier1.sources.KafkaSource.selector.type = multiplexing*
*tier1.sources.KafkaSource.selector.header = logType*
*tier1.sources.KafkaSource.selector.mapping.app = c1*
*tier1.sources.KafkaSource.selector.default = c2*

What's important to mention is that JSON deserialization in this stage is
tempting, but I wouldn't do that due to performance hit.
Instead, you could use string splitting in order to get logType's value.

With a bit more performance hit than string splitting, but with less time
to a working prototype, I would use regex extractor interceptor that is
able to extract certain part of the message into the header according to
the regex pattern.
See:
https://flume.apache.org/FlumeUserGuide.html#regex-extractor-interceptor



On Thu, Nov 12, 2015 at 1:34 PM, Zhishan Li <zh...@gmail.com> wrote:

> I have a question: how to set two interceptors to a channel?
>
> Here is my configuration:
>
> # configuration start
> *tier1.sources.KafkaSource.channels = c1 c2*
> *tier1.sources.KafkaSource.interceptors = i1 i2*
> *tier1.sources.KafkaSource.interceptors.i1.type = host*
> *tier1.sources.KafkaSource.interceptors.i1.useIP = true*
>
> *tier1.sources.KafkaSource.interceptors.i2.type =
> org.apache.flume.interceptor.My
> <http://org.apache.flume.interceptor.My>$Builder*
> *tier1.sources.KafkaSource.interceptors.i2.key = logType*
> *tier1.sources.KafkaSource.interceptors.i2.value = app*
>
> *tier1.sources.KafkaSource.selector.type = multiplexing*
> *tier1.sources.KafkaSource.selector.header = logType*
> *tier1.sources.KafkaSource.selector.mapping.app = c1*
> *tier1.sources.KafkaSource.selector.default = c2*
>
>
>
> The Json format source: KafkSource will be processed by the interceptor:
> My$Builder, which will divide the source data to two part - the one with
> logType=app and another’s logType != app.
>
> for example:
>
> {“logType”:”app”,”id”:1,"env”:”test"}
> {“logType”:”phone”,”id”:2,}
> {“logType”:”phone”,”id”:3,”env”:”test"}
>
> the “id”=1 will be trans to channel c1 and the id=2 and id=3 will be sent
> to channel c2.
>
> So my question is: what a new interceptor can be set to make the record
> with logType=app and env=test be sent to channel c2?
>
> Here is my thought, append the below interceptor:
>
> *tier1.sources.KafkaSource.interceptors.i3.type =
> org.apache.flume.interceptor.Myr$Builder*
> *tier1.sources.KafkaSource.interceptors.i3.key = env*
> *tier1.sources.KafkaSource.interceptors.i3.value = test*
>
> But it still make the record with id=2 be sent to channel c2 too.
>
> Please help me figure out.
>
> I expect the interceptor i3 only consume a part of i2’s output with
> logType=app.
>
> Thanks
>
> I
>



-- 

Best regards,
Ahmed Vila | Senior software developer
DevLogic | Sarajevo | Bosnia and Herzegovina

Office : +387 33 942 123
Mobile: +387 62 139 348

Website: www.devlogic.eu
E-mail   : avila@devlogic.eu
---------------------------------------------------------------------
This e-mail and any attachment is for authorised use by the intended
recipient(s) only. This email contains confidential information. It should
not be copied, disclosed to, retained or used by, any party other than the
intended recipient. Any unauthorised distribution, dissemination or copying
of this E-mail or its attachments, and/or any use of any information
contained in them, is strictly prohibited and may be illegal. If you are
not an intended recipient then please promptly delete this e-mail and any
attachment and all copies and inform the sender directly via email. Any
emails that you send to us may be monitored by systems or persons other
than the named communicant for the purposes of ascertaining whether the
communication complies with the law and company policies.

-- 
---------------------------------------------------------------------
This e-mail and any attachment is for authorised use by the intended 
recipient(s) only. This email contains confidential information. It should 
not be copied, disclosed to, retained or used by, any party other than the 
intended recipient. Any unauthorised distribution, dissemination or copying 
of this E-mail or its attachments, and/or any use of any information 
contained in them, is strictly prohibited and may be illegal. If you are 
not an intended recipient then please promptly delete this e-mail and any 
attachment and all copies and inform the sender directly via email. Any 
emails that you send to us may be monitored by systems or persons other 
than the named communicant for the purposes of ascertaining whether the 
communication complies with the law and company policies.