You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by "Cochran, David M (Contractor)" <Da...@bsee.gov> on 2012/09/25 15:06:30 UTC

multiple sources single channel

I'm getting closer, but still doing something wrong..  Do most folks have this hard a time wrapping their heads around the configs or is it just me?

-Dave


host#1 - tailing 2 source log files... send to host#2 over one channel to sink FILE_ROLL to be written out.


HOST#1  Results-
25 Sep 2012 07:40:51,289 WARN  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks:678)  - Configuration for : avroSink has errors, and will be removed:
org.apache.flume.conf.ConfigurationException: Channel fileChannel fileChannel1 not in active set.


HOST#2 Results-
25 Sep 2012 06:45:11,568 WARN  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks:678)  - Configuration for : filesink1 has errors, and will be removed: 
org.apache.flume.conf.ConfigurationException: Channel fileChannel1 not in active set.


Config for each host follow:


HOST#1-
node105.sources = tailsource tailsource1
node105.channels = fileChannel fileChannel1
node105.sinks = avroSink

node105.sources.tailsource.type = exec
node105.sources.tailsource.command = tail -F /home/XXXX/test.log

node105.sources.tailsource1.type = exec
node105.sources.tailsource1.command = tail -F /home/XXXX/test1.log

node105.sources.tailsource.channels = fileChannel
node105.sources.tailsource1.channels = fileChannel1

## Sink sends avro messages to xxxx on port 9432
node105.sinks.avroSink.type = avro
node105.sinks.avroSink.channel = fileChannel fileChannel1
node105.sinks.avroSink.hostname = 192.168.1.128
node105.sinks.avroSink.port = 9432

node105.channels.fileChannel.type = file
node105.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints
node105.channels.fileChannel.dataDirs = /tmp/flume/data2
node105.channels.fileChannel.capacity = 10000
node105.channels.fileChannel.checkpointInterval = 3000
node105.channels.fileChannel.maxFileSize = 5242880

node105.channels.fileChannel1.type = file
node105.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1
node105.channels.fileChannel1.dataDirs = /tmp/flume/data22
node105.channels.fileChannel1.capacity = 10000
node105.channels.fileChannel1.checkpointInterval = 3000
node105.channels.fileChannel1.maxFileSize = 5242880




HOST#2 - 
node102.sources = avroSource
node102.channels = fileChannel FileChannel1
node102.sinks = filesink filesink1

## Source listens for avro messages on port 9432 on all ips
node102.sources.avroSource.type = avro
node102.sources.avroSource.channels = fileChannel fileChannel1
node102.sources.avroSource.bind = 0.0.0.0
node102.sources.avroSource.port = 9432

node102.sinks.filesink.type = FILE_ROLL
node102.sinks.filesink.batchSize = 1000
node102.sinks.filesink.channel = fileChannel
node102.sinks.filesink.sink.directory = /logs/rhel5/
node102.sinks.filesink.sink.rollInterval = 86400
node102.sinks.filesink.sink.serializer = TEXT

node102.channels.fileChannel.type = file
node102.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints
node102.channels.fileChannel.dataDirs = /tmp/flume/data1
node102.channels.fileChannel.capacity = 5000
node102.channels.fileChannel.checkpointInterval = 45000
node102.channels.fileChannel.maxFileSize = 5242880

node102.sinks.filesink1.type = FILE_ROLL
node102.sinks.filesink1.batchSize = 1000
node102.sinks.filesink1.channel = fileChannel1
node102.sinks.filesink1.sink.directory = /logs/rhel51/
node102.sinks.filesink1.sink.rollInterval = 86400
node102.sinks.filesink1.sink.serializer = TEXT

node102.channels.fileChannel1.type = file
node102.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1
node102.channels.fileChannel1.dataDirs = /tmp/flume/data2
node102.channels.fileChannel1.capacity = 5000
node102.channels.fileChannel1.checkpointInterval = 45000
node102.channels.fileChannel1.maxFileSize = 5242880


Re: multiple sources single channel

Posted by Brock Noland <br...@cloudera.com>.
Hi,

There are other ways to do it, but I would:

1) Setup two agents, one on each host, node105 and node102.

2) In each agent run two separate flows, one for each file. On each
node (source1, channel1, sink1 & source2, channel2, sink2):

node105.sources.source1.type = exec
node105.sources.source1.command = tail -F file1
node105.sources.source1.channels = channel1

node105.sources.source2.type = exec
node105.sources.source2.command = tail -F file2
node105.sources.source2.channels = channel2

node105.channels.channel1.type = memory
node105.channels.channel2.type = memory


node105.sinks.sink1.type = avro
node105.sinks.sink1.batch-size = 1000
node105.sinks.sink1.channel = channel1
node105.sinks.sink1.sink.hostname = node102
node105.sinks.sink1.sink.port = 8881

node105.sinks.sink2.type = avro
node105.sinks.sink2.batch-size = 1000
node105.sinks.sink2.channel = channel2
node105.sinks.sink2.sink.hostname = node102
node105.sinks.sink2.sink.port = 8882





node102.sources.source1.type = avro
node102.sources.source1.bind = 0.0.0.0
node102.sources.source1.port = 8881
node105.sources.source1.channels = channel1

node102.sources.source2.type = avro
node102.sources.source2.bind = 0.0.0.0
node102.sources.source2.port = 8882
node102.sources.source2.channels = channel2

node102.channels.channel1.type = memory
node102.channels.channel2.type = memory

node102.sinks.sink1.type = FILE_ROLL
node102.sinks.sink1.batchSize = 1000
node102.sinks.sink1.channel = channel1
node102.sinks.sink1.sink.directory = /dir1
node102.sinks.sink1.sink.rollInterval = 86400
node102.sinks.sink1.sink.serializer = TEXT

node102.sinks.sink2.type = FILE_ROLL
node102.sinks.sink2.batchSize = 1000
node102.sinks.sink2.channel = channel2
node102.sinks.sink2.sink.directory = /dir2
node102.sinks.sink2.sink.rollInterval = 86400
node102.sinks.sink2.sink.serializer = TEXT

On Thu, Sep 27, 2012 at 11:37 AM, Cochran, David M (Contractor)
<Da...@bsee.gov> wrote:
> Hi Brock,
>
> I hope I'm not becoming a total pain..  I understand your example
> (actually your example from last week was able to get me there easily).
> The current config you sent shows the logs being generated end up in a
> local directory on the same box.  My trouble is understanding how to get
> the two tail-sources from this machine generating the logs to another
> machine that's acting as the permanent home for the logs, so they end up
> in their own directories, within one running instance of flume on the
> receiving machine.  I was able to make a real mess of things and get the
> two source files end up as one combined file at the other side (total
> mess).  Just couldn't figure out how to get them to split apart.
>
> So far I've only been able to get the 2 files from the source machine to
> the remote machine, but in this manner:  one instance of flume running
> on the source, but able to send one log file to the receiving machine on
> one port and the other log file on another, which on the receiving
> machine running two instances of flume, each listening on a different
> port and writing out the data it receives to a set directory.
>
> Now while this cobbled together method works, it will not scale up well
> at all.... with several servers each sending a few log files to the
> server that will be final resting place for the logs.  Dozens of flume
> instances running on the receiving server across a myriad or ports =
> disaster.
>
> Perhaps this confuses the matter further.
>
> -Dave
>
>
>
> -----Original Message-----
> From: Brock Noland [mailto:brock@cloudera.com]
> Sent: Thursday, September 27, 2012 9:46 AM
> To: Cochran, David M (Contractor)
> Subject: Re: multiple sources single channel
>
> This can all be in the same agent.
>
> node105.sources.source1.type = exec
> node105.sources.source1.command = tail -F file1
> node105.sources.source1.channels = channel1
>
> node105.sources.source2.type = exec
> node105.sources.source2.command = tail -F file2
> node105.sources.source2.channels = channel2
>
> node105.channels.channel1.type = memory
> node105.channels.channel2.type = memory
>
> node105.sinks.sink1.type = FILE_ROLL
> node105.sinks.sink1.batchSize = 1000
> node105.sinks.sink1.channel = channel1
> node105.sinks.sink1.sink.directory = /dir1
> node105.sinks.sink1.sink.rollInterval = 86400
> node105.sinks.sink1.sink.serializer = TEXT
>
> node105.sinks.sink2.type = FILE_ROLL
> node105.sinks.sink2.batchSize = 1000
> node105.sinks.sink2.channel = channel2
> node105.sinks.sink2.sink.directory = /dir2
> node105.sinks.sink2.sink.rollInterval = 86400
> node105.sinks.sink2.sink.serializer = TEXT
>
>
>
> On Thu, Sep 27, 2012 at 7:47 AM, Brock Noland <br...@cloudera.com>
> wrote:
>> Ok, I'll send you an example later today.
>>
>> --
>> Brock Noland
>> Sent with Sparrow
>>
>> On Thursday, September 27, 2012 at 7:28 AM, Cochran, David M
>> (Contractor)
>> wrote:
>>
>> What I'd like is for each file to end up in its own directory.
>>
>> -Dave
>>
>> -----Original Message-----
>> From: Brock Noland [mailto:brock@cloudera.com]
>> Sent: Wednesday, September 26, 2012 3:51 PM
>> To: Cochran, David M (Contractor)
>> Subject: Re: multiple sources single channel
>>
>> OK you have two tail sources. What is the end result you are looking
>> for?
>>
>> 1) Each file which is being tailed ends up in it's own directory
>> 2) Both files end up in the same directory
>> 3) something else, if so pls explain
>>
>> Brock
>>
>> On Wed, Sep 26, 2012 at 2:50 PM, Cochran, David M (Contractor)
>> <Da...@bsee.gov> wrote:
>>
>> Hi Brock,
>>
>> So, are you saying that two sources into one channel to the sink and
>> the sink can split the stream back out to two files for storage
>>
>> somehow?
>>
>> (I tried this but either it's not possible or my config isn't quite
>> right)
>>
>> Or need i create a separate channel (each on their own port??) (and
>> separate instance of flume???) for each file I want to store on the
>> sink?
>>
>> -Dave
>>
>>
>> -----Original Message-----
>> From: Brock Noland [mailto:brock@cloudera.com]
>> Sent: Tuesday, September 25, 2012 9:36 AM
>> To: user@flume.apache.org
>> Subject: Re: multiple sources single channel
>>
>> Hi,
>>
>> A source can write to multiple channels but a sink can only read from
>> a single channel. So in this case, you probably just want one channel
>> and then have both sources write to that single channel. Notice that
>> for sources it's ".channels = " while for sinks it's ".channel = "
>>
>> Cheers,
>> Brock
>>
>> On Tue, Sep 25, 2012 at 8:06 AM, Cochran, David M (Contractor)
>> <Da...@bsee.gov> wrote:
>>
>>
>> I'm getting closer, but still doing something wrong.. Do most folks
>> have this hard a time wrapping their heads around the configs or is it
>>
>> just me?
>>
>>
>> -Dave
>>
>>
>> host#1 - tailing 2 source log files... send to host#2 over one channel
>>
>>
>> to sink FILE_ROLL to be written out.
>>
>>
>> HOST#1 Results-
>> 25 Sep 2012 07:40:51,289 WARN [conf-file-poller-0]
>> (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validate
>> S
>> inks:678)
>> - Configuration for : avroSink has errors, and will be removed:
>> org.apache.flume.conf.ConfigurationException: Channel fileChannel
>> fileChannel1 not in active set.
>>
>>
>> HOST#2 Results-
>> 25 Sep 2012 06:45:11,568 WARN [conf-file-poller-0]
>> (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validate
>> S
>> inks:678)
>> - Configuration for : filesink1 has errors, and will be removed:
>> org.apache.flume.conf.ConfigurationException: Channel fileChannel1
>> not
>>
>>
>> in active set.
>>
>>
>> Config for each host follow:
>>
>>
>> HOST#1-
>> node105.sources = tailsource tailsource1 node105.channels =
>> fileChannel fileChannel1 node105.sinks = avroSink
>>
>> node105.sources.tailsource.type = exec
>> node105.sources.tailsource.command = tail -F /home/XXXX/test.log
>>
>> node105.sources.tailsource1.type = exec
>> node105.sources.tailsource1.command = tail -F /home/XXXX/test1.log
>>
>> node105.sources.tailsource.channels = fileChannel
>> node105.sources.tailsource1.channels = fileChannel1
>>
>> ## Sink sends avro messages to xxxx on port 9432
>> node105.sinks.avroSink.type = avro node105.sinks.avroSink.channel =
>> fileChannel fileChannel1 node105.sinks.avroSink.hostname =
>> 192.168.1.128 node105.sinks.avroSink.port = 9432
>>
>> node105.channels.fileChannel.type = file
>> node105.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints
>> node105.channels.fileChannel.dataDirs = /tmp/flume/data2
>> node105.channels.fileChannel.capacity = 10000
>> node105.channels.fileChannel.checkpointInterval = 3000
>> node105.channels.fileChannel.maxFileSize = 5242880
>>
>> node105.channels.fileChannel1.type = file
>> node105.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1
>>
>>
>> node105.channels.fileChannel1.dataDirs = /tmp/flume/data22
>> node105.channels.fileChannel1.capacity = 10000
>> node105.channels.fileChannel1.checkpointInterval = 3000
>> node105.channels.fileChannel1.maxFileSize = 5242880
>>
>>
>>
>>
>> HOST#2 -
>> node102.sources = avroSource
>> node102.channels = fileChannel FileChannel1 node102.sinks = filesink
>> filesink1
>>
>> ## Source listens for avro messages on port 9432 on all ips
>> node102.sources.avroSource.type = avro
>> node102.sources.avroSource.channels = fileChannel fileChannel1
>> node102.sources.avroSource.bind = 0.0.0.0
>> node102.sources.avroSource.port = 9432
>>
>> node102.sinks.filesink.type = FILE_ROLL
>> node102.sinks.filesink.batchSize = 1000
>> node102.sinks.filesink.channel
>>
>>
>> = fileChannel node102.sinks.filesink.sink.directory = /logs/rhel5/
>> node102.sinks.filesink.sink.rollInterval = 86400
>> node102.sinks.filesink.sink.serializer = TEXT
>>
>> node102.channels.fileChannel.type = file
>> node102.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints
>> node102.channels.fileChannel.dataDirs = /tmp/flume/data1
>> node102.channels.fileChannel.capacity = 5000
>> node102.channels.fileChannel.checkpointInterval = 45000
>> node102.channels.fileChannel.maxFileSize = 5242880
>>
>> node102.sinks.filesink1.type = FILE_ROLL
>> node102.sinks.filesink1.batchSize = 1000
>> node102.sinks.filesink1.channel = fileChannel1
>> node102.sinks.filesink1.sink.directory = /logs/rhel51/
>> node102.sinks.filesink1.sink.rollInterval = 86400
>> node102.sinks.filesink1.sink.serializer = TEXT
>>
>> node102.channels.fileChannel1.type = file
>> node102.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1
>>
>>
>> node102.channels.fileChannel1.dataDirs = /tmp/flume/data2
>> node102.channels.fileChannel1.capacity = 5000
>> node102.channels.fileChannel1.checkpointInterval = 45000
>> node102.channels.fileChannel1.maxFileSize = 5242880
>>
>>
>>
>>
>> --
>> Apache MRUnit - Unit testing MapReduce -
>> http://incubator.apache.org/mrunit/
>>
>>
>>
>>
>> --
>> Apache MRUnit - Unit testing MapReduce -
>> http://incubator.apache.org/mrunit/
>>
>>
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce -
> http://incubator.apache.org/mrunit/



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Re: multiple sources single channel

Posted by Brock Noland <br...@cloudera.com>.
Hi,

A source can write to multiple channels but a sink can only read from
a single channel. So in this case, you probably just want one channel
and then have both sources write to that single channel. Notice that
for sources it's ".channels = " while for sinks it's ".channel = "

Cheers,
Brock

On Tue, Sep 25, 2012 at 8:06 AM, Cochran, David M (Contractor)
<Da...@bsee.gov> wrote:
>
> I'm getting closer, but still doing something wrong..  Do most folks have
> this hard a time wrapping their heads around the configs or is it just me?
>
> -Dave
>
>
> host#1 - tailing 2 source log files... send to host#2 over one channel to
> sink FILE_ROLL to be written out.
>
>
> HOST#1  Results-
> 25 Sep 2012 07:40:51,289 WARN  [conf-file-poller-0]
> (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks:678)
> - Configuration for : avroSink has errors, and will be removed:
> org.apache.flume.conf.ConfigurationException: Channel fileChannel
> fileChannel1 not in active set.
>
>
> HOST#2 Results-
> 25 Sep 2012 06:45:11,568 WARN  [conf-file-poller-0]
> (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks:678)
> - Configuration for : filesink1 has errors, and will be removed:
> org.apache.flume.conf.ConfigurationException: Channel fileChannel1 not in
> active set.
>
>
> Config for each host follow:
>
>
> HOST#1-
> node105.sources = tailsource tailsource1
> node105.channels = fileChannel fileChannel1
> node105.sinks = avroSink
>
> node105.sources.tailsource.type = exec
> node105.sources.tailsource.command = tail -F /home/XXXX/test.log
>
> node105.sources.tailsource1.type = exec
> node105.sources.tailsource1.command = tail -F /home/XXXX/test1.log
>
> node105.sources.tailsource.channels = fileChannel
> node105.sources.tailsource1.channels = fileChannel1
>
> ## Sink sends avro messages to xxxx on port 9432
> node105.sinks.avroSink.type = avro
> node105.sinks.avroSink.channel = fileChannel fileChannel1
> node105.sinks.avroSink.hostname = 192.168.1.128
> node105.sinks.avroSink.port = 9432
>
> node105.channels.fileChannel.type = file
> node105.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints
> node105.channels.fileChannel.dataDirs = /tmp/flume/data2
> node105.channels.fileChannel.capacity = 10000
> node105.channels.fileChannel.checkpointInterval = 3000
> node105.channels.fileChannel.maxFileSize = 5242880
>
> node105.channels.fileChannel1.type = file
> node105.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1
> node105.channels.fileChannel1.dataDirs = /tmp/flume/data22
> node105.channels.fileChannel1.capacity = 10000
> node105.channels.fileChannel1.checkpointInterval = 3000
> node105.channels.fileChannel1.maxFileSize = 5242880
>
>
>
>
> HOST#2 -
> node102.sources = avroSource
> node102.channels = fileChannel FileChannel1
> node102.sinks = filesink filesink1
>
> ## Source listens for avro messages on port 9432 on all ips
> node102.sources.avroSource.type = avro
> node102.sources.avroSource.channels = fileChannel fileChannel1
> node102.sources.avroSource.bind = 0.0.0.0
> node102.sources.avroSource.port = 9432
>
> node102.sinks.filesink.type = FILE_ROLL
> node102.sinks.filesink.batchSize = 1000
> node102.sinks.filesink.channel = fileChannel
> node102.sinks.filesink.sink.directory = /logs/rhel5/
> node102.sinks.filesink.sink.rollInterval = 86400
> node102.sinks.filesink.sink.serializer = TEXT
>
> node102.channels.fileChannel.type = file
> node102.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints
> node102.channels.fileChannel.dataDirs = /tmp/flume/data1
> node102.channels.fileChannel.capacity = 5000
> node102.channels.fileChannel.checkpointInterval = 45000
> node102.channels.fileChannel.maxFileSize = 5242880
>
> node102.sinks.filesink1.type = FILE_ROLL
> node102.sinks.filesink1.batchSize = 1000
> node102.sinks.filesink1.channel = fileChannel1
> node102.sinks.filesink1.sink.directory = /logs/rhel51/
> node102.sinks.filesink1.sink.rollInterval = 86400
> node102.sinks.filesink1.sink.serializer = TEXT
>
> node102.channels.fileChannel1.type = file
> node102.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1
> node102.channels.fileChannel1.dataDirs = /tmp/flume/data2
> node102.channels.fileChannel1.capacity = 5000
> node102.channels.fileChannel1.checkpointInterval = 45000
> node102.channels.fileChannel1.maxFileSize = 5242880
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/