You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by larryzhang <zh...@gmail.com> on 2013/03/13 04:33:38 UTC
Exec source doesn't flush the last data
Hi,
I did a simple test about exec source, and found it didn't flush the
last data. Here's the steps:
*a. create the source file 1.test, which has sequence number from 1 to
15, like this:*
----------
1
2
...
15
----------
*b. create the configure file flume_simple.conf like this:*
-------------------------
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -n +0 -F
/opt/scripts/tvhadoop/flume/flume-1.3.0/source/1.test
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 10
a1.channels.c1.type = memory
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /opt/scripts/tvhadoop/flume/flume-1.3.0/sink
---------------------
*c. run flume with command: *
bin/flume-ng agent --conf conf -f conf/flume_simple.conf
-Dflume.root.logger=DEBUG,console -n a1
After more than 1 minute(file roll interval), I check the output
directory, there are 2 files, one has number from 1 to 10, and the other
has nothing.
*I think this is because the batchSize was set to 10, the the last 5
numbers didn't get flushed and lost.* Even I apply the patch in
'https://issues.apache.org/jira/browse/FLUME-1819', nothing changed. If
I debug into the code, *I found the red codes outside while clause never
get executed*.
----------------
while ((line = reader.readLine()) != null) {
counterGroup.incrementAndGet("exec.lines.read");
eventList.add(EventBuilder.withBody(line.getBytes()));
if(eventList.size() >= bufferCount) {
channelProcessor.processEventBatch(eventList);
eventList.clear();
}
}
if(!eventList.isEmpty()) {
channelProcessor.processEventBatch(eventList);
}
--------------
In my scenario, the source log files are divided by hour, so I need
to change the file name in flume configure file. Because of the above
bug, I can only set the batchsize of execSource to 1, which
significantly slowdown the through pass. I wonder how to solve this
problem. Any suggestions are most welcomed.
Best Regards,
larry
Re: Exec source doesn't flush the last data
Posted by Hari Shreedharan <hs...@cloudera.com>.
That is a known issue. I believe there was some feedback on the jira. Unfortunately, it has not been incorporated. I will get to it at some point and fix it :-)
--
Hari Shreedharan
On Tuesday, March 12, 2013 at 8:33 PM, larryzhang wrote:
> Hi,
> I did a simple test about exec source, and found it didn't flush the last data. Here's the steps:
> a. create the source file 1.test, which has sequence number from 1 to 15, like this:
> ----------
> 1
> 2
> ...
> 15
> ----------
> b. create the configure file flume_simple.conf like this:
> -------------------------
> a1.sources = r1
> a1.channels = c1
> a1.sinks = k1
>
> a1.sources.r1.type = exec
> a1.sources.r1.command = tail -n +0 -F /opt/scripts/tvhadoop/flume/flume-1.3.0/source/1.test
> a1.sources.r1.channels = c1
> a1.sources.r1.batchSize = 10
>
> a1.channels.c1.type = memory
>
> a1.sinks.k1.type = file_roll
> a1.sinks.k1.channel = c1
> a1.sinks.k1.sink.directory = /opt/scripts/tvhadoop/flume/flume-1.3.0/sink
> ---------------------
> c. run flume with command:
> bin/flume-ng agent --conf conf -f conf/flume_simple.conf -Dflume.root.logger=DEBUG,console -n a1
>
> After more than 1 minute(file roll interval), I check the output directory, there are 2 files, one has number from 1 to 10, and the other has nothing.
> I think this is because the batchSize was set to 10, the the last 5 numbers didn't get flushed and lost. Even I apply the patch in 'https://issues.apache.org/jira/browse/FLUME-1819', nothing changed. If I debug into the code, I found the red codes outside while clause never get executed.
> ----------------
> while ((line = reader.readLine()) != null) {
> counterGroup.incrementAndGet("exec.lines.read");
> eventList.add(EventBuilder.withBody(line.getBytes()));
> if(eventList.size() >= bufferCount) {
> channelProcessor.processEventBatch(eventList);
> eventList.clear();
> }
> }
> if(!eventList.isEmpty()) {
> channelProcessor.processEventBatch(eventList);
> }
> --------------
> In my scenario, the source log files are divided by hour, so I need to change the file name in flume configure file. Because of the above bug, I can only set the batchsize of execSource to 1, which significantly slowdown the through pass. I wonder how to solve this problem. Any suggestions are most welcomed.
> Best Regards,
> larry
>
>
>
Re: Exec source doesn't flush the last data
Posted by larryzhang <zh...@gmail.com>.
Just add one thing, I use flume-ng 1.3.1
On 03/13/2013 11:33 AM, larryzhang wrote:
> Hi,
> I did a simple test about exec source, and found it didn't flush
> the last data. Here's the steps:
> *a. create the source file 1.test, which has sequence number from 1 to
> 15, like this:*
> ----------
> 1
> 2
> ...
> 15
> ----------
> *b. create the configure file flume_simple.conf like this:*
> -------------------------
> a1.sources = r1
> a1.channels = c1
> a1.sinks = k1
>
> a1.sources.r1.type = exec
> a1.sources.r1.command = tail -n +0 -F
> /opt/scripts/tvhadoop/flume/flume-1.3.0/source/1.test
> a1.sources.r1.channels = c1
> a1.sources.r1.batchSize = 10
>
> a1.channels.c1.type = memory
>
> a1.sinks.k1.type = file_roll
> a1.sinks.k1.channel = c1
> a1.sinks.k1.sink.directory = /opt/scripts/tvhadoop/flume/flume-1.3.0/sink
> ---------------------
> *c. run flume with command: *
> bin/flume-ng agent --conf conf -f conf/flume_simple.conf
> -Dflume.root.logger=DEBUG,console -n a1
>
> After more than 1 minute(file roll interval), I check the output
> directory, there are 2 files, one has number from 1 to 10, and the
> other has nothing.
> *I think this is because the batchSize was set to 10, the the last 5
> numbers didn't get flushed and lost.* Even I apply the patch in
> 'https://issues.apache.org/jira/browse/FLUME-1819', nothing changed.
> If I debug into the code, *I found the red codes outside while clause
> never get executed*.
> ----------------
> while ((line = reader.readLine()) != null) {
> counterGroup.incrementAndGet("exec.lines.read");
> eventList.add(EventBuilder.withBody(line.getBytes()));
> if(eventList.size() >= bufferCount) {
> channelProcessor.processEventBatch(eventList);
> eventList.clear();
> }
> }
> if(!eventList.isEmpty()) {
> channelProcessor.processEventBatch(eventList);
> }
> --------------
> In my scenario, the source log files are divided by hour, so I
> need to change the file name in flume configure file. Because of the
> above bug, I can only set the batchsize of execSource to 1, which
> significantly slowdown the through pass. I wonder how to solve this
> problem. Any suggestions are most welcomed.
> Best Regards,
> larry
>
>
>