You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Jeremy Custenborder <jc...@gmail.com> on 2012/02/14 20:17:27 UTC

File rollover and small files

Hello All,

I'm working on a POC using flume to aggregate our log files to s3
which will later be imported to hdfs and consumed by hive.  Here is my
problem. The web server I'm currently using for the POC is not pushing
much traffic, maybe 3 to 5 requests per second. This is resulting is a
huge number of small files on the web server. I have roll set for
900000 which I thought would generate a file every 15 minutes. I'm
getting files uploaded to s3 anywhere from 5 seconds to 50 seconds
apart, and they are pretty small too 600 bytes to. My goal is to have
at most 4 - 6 files per hour.

Web Server

source: tailDir("/var/log/apache2/site1/", fileregex="access.log")
sink:value("sitecode", "site1") value("subsitecode", "subsite1")
agentDFOSink("collector node",35853)

Collector node

source: collectorSource(35853)
sink: collector(35853) { webLogDecorator()  roll(900000) {
escapedFormatDfs("s3n://<valid s3
bucket>/hive/weblogs_live/dt=%Y-%m-%d/sitecode=%{sitecode}/subsitecode=%{subsitecode}/",
"file-%{rolltag}", seqfile("snappy"))}}

Here is what my config looks like.

  <property>
    <name>flume.collector.roll.millis</name>
    <value>900000</value>
    <description>The time (in milliseconds)
    between when hdfs files are closed and a new file is opened
    (rolled).
    </description>
  </property>
  <property>
    <name>flume.agent.logdir.retransmit</name>
    <value>2700000</value>
    <description>The time (in milliseconds)
    between when hdfs files are closed and a new file is opened
    (rolled).
    </description>
  </property>
  <property>
    <name>flume.agent.logdir.maxage</name>
    <value>450000</value>
    <description> number of milliseconds before a local log file is
    considered closed and ready to forward.
    </description>
  </property>

I have to be missing something? What am I doing wrong?

J

Re: File rollover and small files

Posted by Steve Hoffman <st...@goofy.net>.
Opened a bug: https://issues.apache.org/jira/browse/FLUME-983

Steve

On Mon, Feb 20, 2012 at 3:56 PM, Steve Hoffman <st...@goofy.net> wrote:
> I noticed this too.
> I was using a custom sink based off the AvroDataFileOutputFormat.
> I think the problem comes from the sink.flush() call which calls
> sync() in DataFileWriter.
> This in turns calls writeBlock() which runs the data thru the compression codec.
> Not sure where the logic goes wrong, but seems that calling flush()
> causes the amount of compressible data to be small so it doesn't
> really do much (I noticed about 1/2 size of the original).
>
> Just to verify the data wasn't as small as it could be, I used
> avro-tools tojson followed by a fromjson (with snappy codec and my
> schema had from getschema on avrotools) and notices the resulting file
> was about 1/10th the original.
>
> So, in my own output format, I removed that flush() line (in my
> format() after the sink.append()), which caused the size to come out
> almost the same as my tojson/fromjson experiment.
>
> So it seems that flush() should be removed, but I don't know what that
> will do to DFO/E2E flows (not that a flush really flushes to HDFS the
> way it makes it persistent on a POSIX filesystem).
>
> Can anybody more familiar with the code say what removing this flush()
> will do to the flume event processing/acks?  Is this a bug (do we need
> a jira)?
>
> Steve
>
> On Tue, Feb 14, 2012 at 1:17 PM, Jeremy Custenborder
> <jc...@gmail.com> wrote:
>> Hello All,
>>
>> I'm working on a POC using flume to aggregate our log files to s3
>> which will later be imported to hdfs and consumed by hive.  Here is my
>> problem. The web server I'm currently using for the POC is not pushing
>> much traffic, maybe 3 to 5 requests per second. This is resulting is a
>> huge number of small files on the web server. I have roll set for
>> 900000 which I thought would generate a file every 15 minutes. I'm
>> getting files uploaded to s3 anywhere from 5 seconds to 50 seconds
>> apart, and they are pretty small too 600 bytes to. My goal is to have
>> at most 4 - 6 files per hour.
>>
>> Web Server
>>
>> source: tailDir("/var/log/apache2/site1/", fileregex="access.log")
>> sink:value("sitecode", "site1") value("subsitecode", "subsite1")
>> agentDFOSink("collector node",35853)
>>
>> Collector node
>>
>> source: collectorSource(35853)
>> sink: collector(35853) { webLogDecorator()  roll(900000) {
>> escapedFormatDfs("s3n://<valid s3
>> bucket>/hive/weblogs_live/dt=%Y-%m-%d/sitecode=%{sitecode}/subsitecode=%{subsitecode}/",
>> "file-%{rolltag}", seqfile("snappy"))}}
>>
>> Here is what my config looks like.
>>
>>  <property>
>>    <name>flume.collector.roll.millis</name>
>>    <value>900000</value>
>>    <description>The time (in milliseconds)
>>    between when hdfs files are closed and a new file is opened
>>    (rolled).
>>    </description>
>>  </property>
>>  <property>
>>    <name>flume.agent.logdir.retransmit</name>
>>    <value>2700000</value>
>>    <description>The time (in milliseconds)
>>    between when hdfs files are closed and a new file is opened
>>    (rolled).
>>    </description>
>>  </property>
>>  <property>
>>    <name>flume.agent.logdir.maxage</name>
>>    <value>450000</value>
>>    <description> number of milliseconds before a local log file is
>>    considered closed and ready to forward.
>>    </description>
>>  </property>
>>
>> I have to be missing something? What am I doing wrong?
>>
>> J

Re: File rollover and small files

Posted by Steve Hoffman <st...@goofy.net>.
I noticed this too.
I was using a custom sink based off the AvroDataFileOutputFormat.
I think the problem comes from the sink.flush() call which calls
sync() in DataFileWriter.
This in turns calls writeBlock() which runs the data thru the compression codec.
Not sure where the logic goes wrong, but seems that calling flush()
causes the amount of compressible data to be small so it doesn't
really do much (I noticed about 1/2 size of the original).

Just to verify the data wasn't as small as it could be, I used
avro-tools tojson followed by a fromjson (with snappy codec and my
schema had from getschema on avrotools) and notices the resulting file
was about 1/10th the original.

So, in my own output format, I removed that flush() line (in my
format() after the sink.append()), which caused the size to come out
almost the same as my tojson/fromjson experiment.

So it seems that flush() should be removed, but I don't know what that
will do to DFO/E2E flows (not that a flush really flushes to HDFS the
way it makes it persistent on a POSIX filesystem).

Can anybody more familiar with the code say what removing this flush()
will do to the flume event processing/acks?  Is this a bug (do we need
a jira)?

Steve

On Tue, Feb 14, 2012 at 1:17 PM, Jeremy Custenborder
<jc...@gmail.com> wrote:
> Hello All,
>
> I'm working on a POC using flume to aggregate our log files to s3
> which will later be imported to hdfs and consumed by hive.  Here is my
> problem. The web server I'm currently using for the POC is not pushing
> much traffic, maybe 3 to 5 requests per second. This is resulting is a
> huge number of small files on the web server. I have roll set for
> 900000 which I thought would generate a file every 15 minutes. I'm
> getting files uploaded to s3 anywhere from 5 seconds to 50 seconds
> apart, and they are pretty small too 600 bytes to. My goal is to have
> at most 4 - 6 files per hour.
>
> Web Server
>
> source: tailDir("/var/log/apache2/site1/", fileregex="access.log")
> sink:value("sitecode", "site1") value("subsitecode", "subsite1")
> agentDFOSink("collector node",35853)
>
> Collector node
>
> source: collectorSource(35853)
> sink: collector(35853) { webLogDecorator()  roll(900000) {
> escapedFormatDfs("s3n://<valid s3
> bucket>/hive/weblogs_live/dt=%Y-%m-%d/sitecode=%{sitecode}/subsitecode=%{subsitecode}/",
> "file-%{rolltag}", seqfile("snappy"))}}
>
> Here is what my config looks like.
>
>  <property>
>    <name>flume.collector.roll.millis</name>
>    <value>900000</value>
>    <description>The time (in milliseconds)
>    between when hdfs files are closed and a new file is opened
>    (rolled).
>    </description>
>  </property>
>  <property>
>    <name>flume.agent.logdir.retransmit</name>
>    <value>2700000</value>
>    <description>The time (in milliseconds)
>    between when hdfs files are closed and a new file is opened
>    (rolled).
>    </description>
>  </property>
>  <property>
>    <name>flume.agent.logdir.maxage</name>
>    <value>450000</value>
>    <description> number of milliseconds before a local log file is
>    considered closed and ready to forward.
>    </description>
>  </property>
>
> I have to be missing something? What am I doing wrong?
>
> J