You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Gary Malouf <ma...@gmail.com> on 2014/07/29 16:29:51 UTC

Flume log rolling when you need to do rollups for multiple time zones

We are an ad tech company that buys and sells digital media.  To date, we
have been using Apache Flume 1.4.x to ingest all of our bid request,
response, impression and attribution data.

The logs currently 'roll' hourly for each data type, meaning that at some
point during each hour (if Flume is behaving) the tmp file in HDFS is
closed/renamed with a new one being opened.  This is done for each of 5
running Flume instances.

One problem that has been a challenge to date is effectively bounding our
data queries to make sure we capture all of the data for a given interval
without pulling in the world.  To date, our structure (all in UTC) for each
data type is:

/datatype/yr=2014/mo=06/d=15/{files}

The challenge for us is that Flume is not perfect.

1) It can and will often write data that came in on the new UTC day into
the previous one if that log file has not rolled yet.

2) Since it does not roll perfectly at the top of each hour, we are having
trouble determining the best way to tightly bound a query for data that is
within a few [3-6] hour window properly.

3) When we are doing data rollups in timezones other than UTC, we end up
reading in all of the data for both UTC containing that data to be on the
safe-side.  It would be nice to bound this as described in (2).


One of the major problems affecting the first two cases is that Flume
sometimes gets 'stuck' - that is, the data will hang out in the file
channel for longer than we anticipate.

Anyway, I was just wondering how others have approached these problems to
date.  If not for the edge cases when data can get stuck in Flume, I think
this would be straightforward.

Re: Flume log rolling when you need to do rollups for multiple time zones

Posted by Hari Shreedharan <hs...@apache.org>.
The issue seems to be that it is already the next day when the event
arrives at the agent. You can either move the timestamp interceptor to the
first Flume agent in the pipeline - which reduces the time window in which
this can occur or insert a timestamp header when you create the event (the
header name needs to be "timestamp" with value being
System.currentTimeMillis()). That will guarantee that the event will be in
the correct bucket.

Also, use idleTimeout in the hdfs sink config to reduce the time period a
file waits before it is closed.


On Tue, Jul 29, 2014 at 9:01 AM, Gary Malouf <ma...@gmail.com> wrote:

> Hi Hari,
>
> Below is the config for one of our source-channel-sink combos.  In
> hadoop/spark world, how do you then handle the events that arrive late to
> the bucket?  That is, events for July 15 UTC end up in the July 16 bucket.
>  The ugly way I have been handling this to date is that for any query for a
> particular day I attempt to grab the next UTC day in addition to the
> current one (per my path config in previous email).
>
>
> agent-1.channels.imp-ch1.type = file
> agent-1.channels.imp-ch1.checkpointDir = /opt/flume/imp-ch1/checkpoint
> agent-1.channels.imp-ch1.dataDirs = /opt/flume/imp-ch1/data
> agent-1.channels.imp-ch1.capacity = 100000000
>
> agent-1.sources.avro-imp-source1.channels = imp-ch1
> agent-1.sources.avro-imp-source1.type = avro
> agent-1.sources.avro-imp-source1.bind = 0.0.0.0
> agent-1.sources.avro-imp-source1.port = 41414
>
> agent-1.sources.avro-imp-source1.interceptors = host1 timestamp1
> agent-1.sources.avro-imp-source1.interceptors.host1.type = host
> agent-1.sources.avro-imp-source1.interceptors.host1.useIP = false
> agent-1.sources.avro-imp-source1.interceptors.timestamp1.type = timestamp
>
>
> agent-1.sinks.hdfs-imp-sink1.channel = imp-ch1
> agent-1.sinks.hdfs-imp-sink1.type = hdfs
> agent-1.sinks.hdfs-imp-sink1.hdfs.path =
> hdfs://nn-01:8020/impressions/yr=%Y/mo=%m/d=%d/
> agent-1.sinks.hdfs-imp-sink1.hdfs.filePrefix = %{host}
> agent-1.sinks.hdfs-imp-sink1.hdfs.batchSize = 25
> agent-1.sinks.hdfs-imp-sink1.hdfs.rollInterval = 3600
> agent-1.sinks.hdfs-imp-sink1.hdfs.rollCount = 0
> agent-1.sinks.hdfs-imp-sink1.hdfs.rollSize = 0
>
>
>
>
> On Tue, Jul 29, 2014 at 11:17 AM, Hari Shreedharan <
> hshreedharan@cloudera.com> wrote:
>
>> Can you send your config? There are a couple of params that allow the
>> files to be rolled faster - idleTimeout and rollInterval. I am assuming you
>> are using rollInterval already. idleTimeout will close a file when it is
>> not written to for the configured time. That might help with the rolling.
>> Remember though that if events arrive "late" for a bucket due to failures
>> or network issues, new files will be opened in that bucket if none are
>> currently open.
>>
>>
>> On Tue, Jul 29, 2014 at 7:29 AM, Gary Malouf <ma...@gmail.com>
>> wrote:
>>
>>> We are an ad tech company that buys and sells digital media.  To date,
>>> we have been using Apache Flume 1.4.x to ingest all of our bid request,
>>> response, impression and attribution data.
>>>
>>> The logs currently 'roll' hourly for each data type, meaning that at
>>> some point during each hour (if Flume is behaving) the tmp file in HDFS is
>>> closed/renamed with a new one being opened.  This is done for each of 5
>>> running Flume instances.
>>>
>>> One problem that has been a challenge to date is effectively bounding
>>> our data queries to make sure we capture all of the data for a given
>>> interval without pulling in the world.  To date, our structure (all in UTC)
>>> for each data type is:
>>>
>>> /datatype/yr=2014/mo=06/d=15/{files}
>>>
>>> The challenge for us is that Flume is not perfect.
>>>
>>> 1) It can and will often write data that came in on the new UTC day into
>>> the previous one if that log file has not rolled yet.
>>>
>>> 2) Since it does not roll perfectly at the top of each hour, we are
>>> having trouble determining the best way to tightly bound a query for data
>>> that is within a few [3-6] hour window properly.
>>>
>>> 3) When we are doing data rollups in timezones other than UTC, we end up
>>> reading in all of the data for both UTC containing that data to be on the
>>> safe-side.  It would be nice to bound this as described in (2).
>>>
>>>
>>> One of the major problems affecting the first two cases is that Flume
>>> sometimes gets 'stuck' - that is, the data will hang out in the file
>>> channel for longer than we anticipate.
>>>
>>> Anyway, I was just wondering how others have approached these problems
>>> to date.  If not for the edge cases when data can get stuck in Flume, I
>>> think this would be straightforward.
>>>
>>>
>>
>

Re: Flume log rolling when you need to do rollups for multiple time zones

Posted by Gary Malouf <ma...@gmail.com>.
Hi Hari,

Below is the config for one of our source-channel-sink combos.  In
hadoop/spark world, how do you then handle the events that arrive late to
the bucket?  That is, events for July 15 UTC end up in the July 16 bucket.
 The ugly way I have been handling this to date is that for any query for a
particular day I attempt to grab the next UTC day in addition to the
current one (per my path config in previous email).


agent-1.channels.imp-ch1.type = file
agent-1.channels.imp-ch1.checkpointDir = /opt/flume/imp-ch1/checkpoint
agent-1.channels.imp-ch1.dataDirs = /opt/flume/imp-ch1/data
agent-1.channels.imp-ch1.capacity = 100000000

agent-1.sources.avro-imp-source1.channels = imp-ch1
agent-1.sources.avro-imp-source1.type = avro
agent-1.sources.avro-imp-source1.bind = 0.0.0.0
agent-1.sources.avro-imp-source1.port = 41414

agent-1.sources.avro-imp-source1.interceptors = host1 timestamp1
agent-1.sources.avro-imp-source1.interceptors.host1.type = host
agent-1.sources.avro-imp-source1.interceptors.host1.useIP = false
agent-1.sources.avro-imp-source1.interceptors.timestamp1.type = timestamp


agent-1.sinks.hdfs-imp-sink1.channel = imp-ch1
agent-1.sinks.hdfs-imp-sink1.type = hdfs
agent-1.sinks.hdfs-imp-sink1.hdfs.path =
hdfs://nn-01:8020/impressions/yr=%Y/mo=%m/d=%d/
agent-1.sinks.hdfs-imp-sink1.hdfs.filePrefix = %{host}
agent-1.sinks.hdfs-imp-sink1.hdfs.batchSize = 25
agent-1.sinks.hdfs-imp-sink1.hdfs.rollInterval = 3600
agent-1.sinks.hdfs-imp-sink1.hdfs.rollCount = 0
agent-1.sinks.hdfs-imp-sink1.hdfs.rollSize = 0




On Tue, Jul 29, 2014 at 11:17 AM, Hari Shreedharan <
hshreedharan@cloudera.com> wrote:

> Can you send your config? There are a couple of params that allow the
> files to be rolled faster - idleTimeout and rollInterval. I am assuming you
> are using rollInterval already. idleTimeout will close a file when it is
> not written to for the configured time. That might help with the rolling.
> Remember though that if events arrive "late" for a bucket due to failures
> or network issues, new files will be opened in that bucket if none are
> currently open.
>
>
> On Tue, Jul 29, 2014 at 7:29 AM, Gary Malouf <ma...@gmail.com>
> wrote:
>
>> We are an ad tech company that buys and sells digital media.  To date, we
>> have been using Apache Flume 1.4.x to ingest all of our bid request,
>> response, impression and attribution data.
>>
>> The logs currently 'roll' hourly for each data type, meaning that at some
>> point during each hour (if Flume is behaving) the tmp file in HDFS is
>> closed/renamed with a new one being opened.  This is done for each of 5
>> running Flume instances.
>>
>> One problem that has been a challenge to date is effectively bounding our
>> data queries to make sure we capture all of the data for a given interval
>> without pulling in the world.  To date, our structure (all in UTC) for each
>> data type is:
>>
>> /datatype/yr=2014/mo=06/d=15/{files}
>>
>> The challenge for us is that Flume is not perfect.
>>
>> 1) It can and will often write data that came in on the new UTC day into
>> the previous one if that log file has not rolled yet.
>>
>> 2) Since it does not roll perfectly at the top of each hour, we are
>> having trouble determining the best way to tightly bound a query for data
>> that is within a few [3-6] hour window properly.
>>
>> 3) When we are doing data rollups in timezones other than UTC, we end up
>> reading in all of the data for both UTC containing that data to be on the
>> safe-side.  It would be nice to bound this as described in (2).
>>
>>
>> One of the major problems affecting the first two cases is that Flume
>> sometimes gets 'stuck' - that is, the data will hang out in the file
>> channel for longer than we anticipate.
>>
>> Anyway, I was just wondering how others have approached these problems to
>> date.  If not for the edge cases when data can get stuck in Flume, I think
>> this would be straightforward.
>>
>>
>

Re: Flume log rolling when you need to do rollups for multiple time zones

Posted by Hari Shreedharan <hs...@cloudera.com>.
Can you send your config? There are a couple of params that allow the files
to be rolled faster - idleTimeout and rollInterval. I am assuming you are
using rollInterval already. idleTimeout will close a file when it is not
written to for the configured time. That might help with the rolling.
Remember though that if events arrive "late" for a bucket due to failures
or network issues, new files will be opened in that bucket if none are
currently open.


On Tue, Jul 29, 2014 at 7:29 AM, Gary Malouf <ma...@gmail.com> wrote:

> We are an ad tech company that buys and sells digital media.  To date, we
> have been using Apache Flume 1.4.x to ingest all of our bid request,
> response, impression and attribution data.
>
> The logs currently 'roll' hourly for each data type, meaning that at some
> point during each hour (if Flume is behaving) the tmp file in HDFS is
> closed/renamed with a new one being opened.  This is done for each of 5
> running Flume instances.
>
> One problem that has been a challenge to date is effectively bounding our
> data queries to make sure we capture all of the data for a given interval
> without pulling in the world.  To date, our structure (all in UTC) for each
> data type is:
>
> /datatype/yr=2014/mo=06/d=15/{files}
>
> The challenge for us is that Flume is not perfect.
>
> 1) It can and will often write data that came in on the new UTC day into
> the previous one if that log file has not rolled yet.
>
> 2) Since it does not roll perfectly at the top of each hour, we are having
> trouble determining the best way to tightly bound a query for data that is
> within a few [3-6] hour window properly.
>
> 3) When we are doing data rollups in timezones other than UTC, we end up
> reading in all of the data for both UTC containing that data to be on the
> safe-side.  It would be nice to bound this as described in (2).
>
>
> One of the major problems affecting the first two cases is that Flume
> sometimes gets 'stuck' - that is, the data will hang out in the file
> channel for longer than we anticipate.
>
> Anyway, I was just wondering how others have approached these problems to
> date.  If not for the edge cases when data can get stuck in Flume, I think
> this would be straightforward.
>
>