You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Chris Neal <cw...@gmail.com> on 2012/08/02 16:32:37 UTC

Help with FlumeNG and interceptors please

Hello,

I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors.  I'm
just upgrading from 0.9.4, and redoing my configurations to work with NG.
 In OG, I used a decorator to tag each tailDir file with the path
representing where the source file originated, so that once in HDFS, the
directory structure would mirror the production system it came from.

For example, my agent to collector looked like this:

exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir",
"MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir")
=> { gzip => autoDFOChain } }'

then my collector to hdfs used the value of "*tailDir*" in the HDFS path:

exec config univac_collector_01 'autoCollectorSource'
 'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-",
3660000, raw)'

I was thinking I could replicate this using interceptors in NG.  On my
source, I define my interceptors as such:

# avro-myagent-source properties
myagent.sources.avro-udprodae09_1-source.type = avro
myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com
myagent.sources.avro-udprodae09_1-source.port = 10000
myagent.sources.avro-udprodae09_1-source.interceptors = path
myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static
myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path
myagent.sources.avro-udprodae09_1-source.interceptors.path.value =*
 /mypath/myDir*

which I believe is correct. :)  My first question is how to use this static
header variable within the hdfs "type" property of another agent?  My hdfs
properties look like this:

# hdfs-myagent2-sink properties
myagent2.sinks.hdfs-myagent2-sink.type = hdfs
myagent2.sinks.hdfs-myagent2-sink.path = hdfs://namenode.domain.com/*[somehow
put "path" here]*
myagent2.sinks.hdfs-myagent2-sink.filePrefix = filename
myagent2.sinks.hdfs-myagent2-sink.rollInterval = 0
myagent2.sinks.hdfs-myagent2-sink.rollSize = 67108864
myagent2.sinks.hdfs-myagent2-sink.rollCount = 0
myagent2.sinks.hdfs-myagent2-sink.batchSize = 100
myagent2.sinks.hdfs-myagent2-sink.codeC = bzip2
myagent2.sinks.hdfs-myagent2-sink.fileType = CompressedStream

My second question is about line compression between agents.  In OG, I
would use the "gzip" tag to compress prior to sending data from an agent to
a collector.  How do I do something similar in NG?  Does avro-sink to
avro-source do it automagically for me behind the scenes perhaps?

Thanks so much for your time and help!

Re: Help with FlumeNG and interceptors please

Posted by Chris Neal <cw...@gmail.com>.
I was able to answer my first question myself.  Hopefully this will help
someone else who might be wondering the same thing:

You can reference interceptors in down stream agents just like you could in
OG, but using %{name}.  Like this:

Agent1:
# avro-udprodae09_1-source properties
#pegslog11.sources.avro-udprodae09_1-source.type = avro
#pegslog11.sources.avro-udprodae09_1-source.bind = pegslog11.domain.com
#pegslog11.sources.avro-udprodae09_1-source.port = 10000
pegslog11.sources.avro-udprodae09_1-source.type = exec
pegslog11.sources.avro-udprodae09_1-source.command = tail -F
/path_to_file/MyFile.log
pegslog11.sources.avro-udprodae09_1-source.interceptors = path timestamp
filename
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.type = static
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.key = path
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.value =
/path_to_file
pegslog11.sources.avro-udprodae09_1-source.interceptors.timestamp.type =
timestamp
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.type =
static
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.key =
filename
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.value =
MyFile.log

Agent2:
# hdfs-hadoopdn01-sink properties
hadoopdn01.sinks.hdfs-hadoopdn01-sink.type = hdfs
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.path = hdfs://
hadoopnn01.domain.com/%{path}
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.filePrefix = %{filename}.%Y-%m-%d
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollInterval = 0
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollSize = 2800072288
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollCount = 0
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.batchSize = 100
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.threadsPoolSize = 128
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollTimerPoolSize = 5
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.codeC = bzip2
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.fileType = CompressedStream

This results in /path_to_file/MyFile.log being created in HDFS, as I wanted
it to. :)

I'm still not sure about the wire compression of the AvroSink to AvroSource
connection.  Maybe I'll dig into the code and see.

Chris

On Thu, Aug 2, 2012 at 9:32 AM, Chris Neal <cw...@gmail.com> wrote:

> Hello,
>
> I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors.
>  I'm just upgrading from 0.9.4, and redoing my configurations to work with
> NG.  In OG, I used a decorator to tag each tailDir file with the path
> representing where the source file originated, so that once in HDFS, the
> directory structure would mirror the production system it came from.
>
> For example, my agent to collector looked like this:
>
> exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir",
> "MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir")
> => { gzip => autoDFOChain } }'
>
> then my collector to hdfs used the value of "*tailDir*" in the HDFS path:
>
> exec config univac_collector_01 'autoCollectorSource'
>  'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-",
> 3660000, raw)'
>
> I was thinking I could replicate this using interceptors in NG.  On my
> source, I define my interceptors as such:
>
> # avro-myagent-source properties
> myagent.sources.avro-udprodae09_1-source.type = avro
> myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com
> myagent.sources.avro-udprodae09_1-source.port = 10000
> myagent.sources.avro-udprodae09_1-source.interceptors = path
> myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static
> myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path
> myagent.sources.avro-udprodae09_1-source.interceptors.path.value =*
>  /mypath/myDir*
>
> which I believe is correct. :)  My first question is how to use this
> static header variable within the hdfs "type" property of another agent?
>  My hdfs properties look like this:
>
> # hdfs-myagent2-sink properties
> myagent2.sinks.hdfs-myagent2-sink.type = hdfs
> myagent2.sinks.hdfs-myagent2-sink.path = hdfs://namenode.domain.com/*[somehow
> put "path" here]*
> myagent2.sinks.hdfs-myagent2-sink.filePrefix = filename
> myagent2.sinks.hdfs-myagent2-sink.rollInterval = 0
> myagent2.sinks.hdfs-myagent2-sink.rollSize = 67108864
> myagent2.sinks.hdfs-myagent2-sink.rollCount = 0
> myagent2.sinks.hdfs-myagent2-sink.batchSize = 100
> myagent2.sinks.hdfs-myagent2-sink.codeC = bzip2
> myagent2.sinks.hdfs-myagent2-sink.fileType = CompressedStream
>
> My second question is about line compression between agents.  In OG, I
> would use the "gzip" tag to compress prior to sending data from an agent to
> a collector.  How do I do something similar in NG?  Does avro-sink to
> avro-source do it automagically for me behind the scenes perhaps?
>
> Thanks so much for your time and help!
>