You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Steve Johnson <st...@webninja.com> on 2012/09/05 18:37:40 UTC

Some test findings and insight appreciated

Hi all, I wanted to share some of my test findings/concerns, etc.. First
off, I apologize for this being so verbose, but I feel I need to give a
little bit of a background into our setup and needs to show the big
picture.  Please ignore this if your not interested, you've been warned...

But if you are, great, cause I do have some valid questions to follow and
really looking forward to any constructive comments.

Prior to a few weeks back, I have zero experience with Flume.  Have been
familiar with it's existence for some time (about a year) but nothing more
than that.

My company, generates about 8billion log records per day, spread across 5
dataceters, with about 200 servers in each location.  So about 1.6 billion
per day in each cage.  We're growing and shotting to increase that to about
30billion per day based on holiday traffic growth and our companies growth.
 These log records are currently hourly rotated logback(slf4j) generated
logs from our java applications, containing tab delimited ascii data of
various widths.  There's probably 25 different log types we collect, but
generally all the same format, some average record lengths of 50-60 bytes,
while some others average 1k in width.

Right now, we collect them using a custom built java scheduling
application.  We have a machine dedicated to this at each DC.  This box
fires off some hourly jobs (within minutes after log rotations) that pulls
all the logs from the 200+ servers (some servers generate up to 10
different log types per hour), uncompressed.  We used to pull directly to
our central location, and would initiate compression on the servers
themselves, but this generated CPU/IO spikes every hour that were
causing performance issues.  So we put a remote machine in each node to
handle local collection.  They pull all the logs files locally first, then
compress, then move into a queue.  This happens across all 5 dc's in
parallel.  We have another set of schedulers in our central location that
then each collect from those remote nodes.  Pull them locally, then we do
some ETL work and load the raw log data into our Greenplum warehouse for
nightly aggregations and analysis.

This is obviously becoming very cumbersome to maintain, as we have right
now, 10 different schedulers running over 6 locations.  Also, to guarantee
we've fetched every log file, and also to guarantee we haven' double-loaded
any raw data (this data has only a logrec that's maintained globally to
guarantee uniqueness, so removing dupes is a nightmare, so we like to avoid
that), we have to track every file pickup, for each hop (currently tracked
in a postgresql db) and then use that for validation and to also make sure
we don't pull a rotated log again (logs stay archived on their original
servers for 7 days).

A couple years back when we had 1 or even 2 dc's with only about 30 servers
in each, this wasn't so bad.  But as you can imaging, we're looking at over
80k files generated per day to track and manage.  When things  run smooth,
it's great, when we have issues, it's a pain to dig into.

So what are the requirements I'm looking at for a replacement of said
system?

1. Less, or no custom configuration, must be drop-in and go environment,
right now, as we add/remove servers, I have to edit a lot of db records to
tell the schedulers which servers have which types of logs, I also need to
replicate it out, reload the configs and make sure log sources have ssh
keys installed, etc.
2. Must be able to compress data going between flume agents in remote DC's
to the flume agents in our central location. (bandwith for this kind of
data is not cheap, right now by gzipping the hourly logs locally before we
transfer, we get between about  6:1 to a 10:1 compression ratio depending
on the log type.
3. Must be able to handle the throughput.
4. Must be transcriptional and recoverable, many of these logs correlate
directly to revenue, we must not lose data.
5. Scalable.

>From reading the docs I believe Flume is a possible solution.

Forward to today...

Flume Agent Config:
Version flume-ng 1.2.0:
JAVA_OPTS="-Xms1g -Xmx5g -Dcom.sun.management.jmxremote
-XX:MaxDirectMemorySize=5g"
This is running on a 16 core Intel Xeon 2.4ghz with 48Gb ram, and local
drives running raid5/xfs(not sure of the rpm's, but they're pretty fast).


Testing/Flume Setup:
testagent.sources = netcatSource
testagent.channels = memChannel
testagent.sinks = fileSink

testagent.sources.netcatSource.type = netcat
testagent.sources.netcatSource.channels = memChannel
testagent.sinks.fileSink.type = FILE_ROLL
testagent.sinks.fileSink.channel = memChannel
testagent.channels.memChannel.type = memory

testagent.sources.netcatSource.bind = 0.0.0.0
testagent.sources.netcatSource.port = 6172
testagent.sources.netcatSource.max-line-length = 65536
testagent.channels.memChannel.capacity = 4294967296
testagent.sinks.fileSink.sink.directory =
/opt/dotomi/flume-data/sink/file_roll
testagent.sinks.fileSink.sink.rollInterval = 0

I took one of our production servers hourly logs, one of the largest we
produce, (this one has about 1.2million rows in it for that hour, average
record length about 700 bytes, some creeping up to 4k.  Keep in mind, this
is one server in one cage out of 50 total).

I wrote a Perl script that opens a socket to the NetCat source port on the
agent, and buffers about 10 log recs and then sends them in batches of 10.
 I originally tried line-by-line, this was obviously super inefficient.  I
also attempted more (more on that below) to buffer but started dropping too
many events, i think it was causing buffering issues on the agent.  10
seemed to be the magic number for my setup.  I also started with a
FileChannel (recoverable), and a simple file_roll sink so i could verify
the output files.

I ended up having some troubles getting the FileChannel started.  I ended
up getting it to start with some pretty narrow parameters which caused my
flow to be very slow.  When I tried to set higher numbers in capacity, it
would either not start up, or start but nothing would flow.   I ended up
moving to a memory channel just to get my proof of concept moving, and to
get a test of the framework first.  Also, since we're a java shop, we're
not opposed to the idea of writing custom sources/sinks/channels where need
be, assuming the framework is sound.

After some heavy tuning, I was able to get something that worked well, and
performed very well.  I was eventually able to get 200k per second of these
log events through.

To cut to the chase, Here's some issues I had;

1. Data loss (this was brought up in another thread).  About every other
time (a little less, 40%) I would run the exact same test, it would drop a
very small number of events, 10 or less (out of about 500k events).  Other
times it would pass every event through without issue.
2. Looking at my tunings, I was able to get about 60k per second on a
single flume instance with the above mentioned tuning.  I decided to crank
everything up (double it, even tried then doubling that once more).  This
machine has 48gb and is doing nothing but this.  So logically, I figured I
could bump my OPTS to 10g instead of 5, and up my channel capacity to 8g.
 Allowing me to buffer more and in theory double my throughput.  This
wasn't at all the case, by attempting to throw more at it (either by
lowering my sleep times between batches, or even using the same sleep times
but double my batch size from 10lines to 20, things started flaking out).
 Basically, after about 200k lines went through, it just stopped
processing, no warnings, errors nothing.
Here's where it gets interesting though.  I then setup four flume agents on
the same machine with all the same configurations and startup params back
at the 4g range, all listening on different ports.  I started all 4, and
then in parallel (on another machine), ran my test script to hit all four
agents.  That's when I was able to get 200k through.  So by running four of
them with lower tunables, I was able to get the throughput I couldn't get
running one with 4x the tunables and startup options.


Number 2 is something i can easily live with but would like to hear some
insight on maybe what's causing it.  Obviously the disks can keep up
because all of the file_roll paths for all 4 agents are using the same
drive.  And obviously I have the ram to buffer accordingly.  But for some
reason, one agent with 2x or even 4x the juice starts getting flaky.

Number 1 is more concerning, this obviously will need to be solved.

In summary, I'm willing and ready to spend more time on this.  But wanted
to get some insight from the pros, developers here and also make sure I'm
not crazy and maybe just trying to use this for more than it was designed.

Many thanks for anyone that stuck around to read this! :)


 Cheers

-- 
Steve Johnson
steve@webninja.com

Re: Some test findings and insight appreciated

Posted by Juhani Connolly <ju...@cyberagent.co.jp>.
Hi Steve,

Thanks for the rundown, lots of detail is better than not enough! 
Response inline

On 09/06/2012 01:37 AM, Steve Johnson wrote:
> Hi all, I wanted to share some of my test findings/concerns, etc.. 
> First off, I apologize for this being so verbose, but I feel I need to 
> give a little bit of a background into our setup and needs to show the 
> big picture.  Please ignore this if your not interested, you've been 
> warned...
>
> But if you are, great, cause I do have some valid questions to follow 
> and really looking forward to any constructive comments.
>
> Prior to a few weeks back, I have zero experience with Flume.  Have 
> been familiar with it's existence for some time (about a year) but 
> nothing more than that.
>
> My company, generates about 8billion log records per day, spread 
> across 5 dataceters, with about 200 servers in each location.  So 
> about 1.6 billion per day in each cage.  We're growing and shotting to 
> increase that to about 30billion per day based on holiday traffic 
> growth and our companies growth.  These log records are currently 
> hourly rotated logback(slf4j) generated logs from our java 
> applications, containing tab delimited ascii data of various widths. 
>  There's probably 25 different log types we collect, but generally all 
> the same format, some average record lengths of 50-60 bytes, while 
> some others average 1k in width.
>
> Right now, we collect them using a custom built java scheduling 
> application.  We have a machine dedicated to this at each DC.  This 
> box fires off some hourly jobs (within minutes after log rotations) 
> that pulls all the logs from the 200+ servers (some servers generate 
> up to 10 different log types per hour), uncompressed.  We used to pull 
> directly to our central location, and would initiate compression on 
> the servers themselves, but this generated CPU/IO spikes every hour 
> that were causing performance issues.  So we put a remote machine in 
> each node to handle local collection.  They pull all the logs files 
> locally first, then compress, then move into a queue.  This happens 
> across all 5 dc's in parallel.  We have another set of schedulers in 
> our central location that then each collect from those remote nodes. 
>  Pull them locally, then we do some ETL work and load the raw log data 
> into our Greenplum warehouse for nightly aggregations and analysis.
>
> This is obviously becoming very cumbersome to maintain, as we have 
> right now, 10 different schedulers running over 6 locations.  Also, to 
> guarantee we've fetched every log file, and also to guarantee we 
> haven' double-loaded any raw data (this data has only a logrec that's 
> maintained globally to guarantee uniqueness, so removing dupes is a 
> nightmare, so we like to avoid that), we have to track every file 
> pickup, for each hop (currently tracked in a postgresql db) and then 
> use that for validation and to also make sure we don't pull a rotated 
> log again (logs stay archived on their original servers for 7 days).
>
> A couple years back when we had 1 or even 2 dc's with only about 30 
> servers in each, this wasn't so bad.  But as you can imaging, we're 
> looking at over 80k files generated per day to track and manage.  When 
> things  run smooth, it's great, when we have issues, it's a pain to 
> dig into.
>
> So what are the requirements I'm looking at for a replacement of said 
> system?
>
> 1. Less, or no custom configuration, must be drop-in and go 
> environment, right now, as we add/remove servers, I have to edit a lot 
> of db records to tell the schedulers which servers have which types of 
> logs, I also need to replicate it out, reload the configs and make 
> sure log sources have ssh keys installed, etc.
> 2. Must be able to compress data going between flume agents in remote 
> DC's to the flume agents in our central location. (bandwith for this 
> kind of data is not cheap, right now by gzipping the hourly logs 
> locally before we transfer, we get between about  6:1 to a 10:1 
> compression ratio depending on the log type.
> 3. Must be able to handle the throughput.
> 4. Must be transcriptional and recoverable, many of these logs 
> correlate directly to revenue, we must not lose data.
To have zero data loss you must use a reliable ingest system and a 
lossless channel. Netcat source can't guarrantee delivery(if a channel 
can't fit the sent messages for example, they will just get dropped). 
Memory channel will lose data on a crash.

> 5. Scalable.
>
> >From reading the docs I believe Flume is a possible solution.
>
> Forward to today...
>
> Flume Agent Config:
> Version flume-ng 1.2.0:
> JAVA_OPTS="-Xms1g -Xmx5g -Dcom.sun.management.jmxremote 
> -XX:MaxDirectMemorySize=5g"
> This is running on a 16 core Intel Xeon 2.4ghz with 48Gb ram, and 
> local drives running raid5/xfs(not sure of the rpm's, but they're 
> pretty fast).
>
>
> Testing/Flume Setup:
> testagent.sources = netcatSource
> testagent.channels = memChannel
> testagent.sinks = fileSink
>
> testagent.sources.netcatSource.type = netcat
> testagent.sources.netcatSource.channels = memChannel
> testagent.sinks.fileSink.type = FILE_ROLL
> testagent.sinks.fileSink.channel = memChannel
> testagent.channels.memChannel.type = memory
>
> testagent.sources.netcatSource.bind = 0.0.0.0
> testagent.sources.netcatSource.port = 6172
> testagent.sources.netcatSource.max-line-length = 65536
> testagent.channels.memChannel.capacity = 4294967296
This is huge. The memorychannel uses a blocking queue of events, and I'm 
pretty sure that it will misbehave beyond the limits of the integer 
range. Seeing as it's signed, that would be around 2 billion(and with an 
average event length of say 50, that would consume at least 100gb of 
ram)? FileChannel may or may not deal with huge capacities better. The 
capacity designation is for event count, not bytes of data. Someone did 
however recently post an issue about making physical size a setting in 
some form, maybe you want to add your feedback to that ( 
https://issues.apache.org/jira/browse/FLUME-1535 )

> testagent.sinks.fileSink.sink.directory = 
> /opt/dotomi/flume-data/sink/file_roll
> testagent.sinks.fileSink.sink.rollInterval = 0
>
> I took one of our production servers hourly logs, one of the largest 
> we produce, (this one has about 1.2million rows in it for that hour, 
> average record length about 700 bytes, some creeping up to 4k.  Keep 
> in mind, this is one server in one cage out of 50 total).
>
> I wrote a Perl script that opens a socket to the NetCat source port on 
> the agent, and buffers about 10 log recs and then sends them in 
> batches of 10.  I originally tried line-by-line, this 
> was obviously super inefficient.  I also attempted more (more on that 
> below) to buffer but started dropping too many events, i think it was 
> causing buffering issues on the agent.  10 seemed to be the magic 
> number for my setup.  I also started with a FileChannel (recoverable), 
> and a simple file_roll sink so i could verify the output files.

As mentioned above, NetcatSource is not a reliable ingest system as it 
doesn't know about events that weren't committed. In the long run, for 
lossless, you will want to deliver data via either avro or the 
scribe(thrift) data format. However if you just want to test specs, try 
using ExecSource to tail the log files and fiddle about with the 
batching settings.
>
> I ended up having some troubles getting the FileChannel started.  I 
> ended up getting it to start with some pretty narrow parameters which 
> caused my flow to be very slow.  When I tried to set higher numbers in 
> capacity, it would either not start up, or start but nothing would 
> flow.   I ended up moving to a memory channel just to get my proof of 
> concept moving, and to get a test of the framework first.  Also, since 
> we're a java shop, we're not opposed to the idea of writing custom 
> sources/sinks/channels where need be, assuming the framework is sound.

In its current implementation FileChannel is lossless and thus causes a 
disk flush(which generally writes two separate files) for every 
commit(one for every batch of events). This is going to mean very slow 
throughput if you have small batches. You can however improve this a lot 
by having the channels data directories and checkpoint directories on 
separate disks(not always feasible). Or you can just make sure you're 
batching more events at a time.
>
> After some heavy tuning, I was able to get something that worked well, 
> and performed very well.  I was eventually able to get 200k per second 
> of these log events through.
>
> To cut to the chase, Here's some issues I had;
>
> 1. Data loss (this was brought up in another thread).  About every 
> other time (a little less, 40%) I would run the exact same test, it 
> would drop a very small number of events, 10 or less (out of about 
> 500k events).  Other times it would pass every event through without 
> issue.
I'm going to guess this is the (netcat)source not being able to send 
messages to the channel. Since it doesn't inform the ingest system about 
the failed delivery, the ingest also can't resend. I assume you're not 
getting any exceptions like the other person recently asking about 
netcat source?

> 2. Looking at my tunings, I was able to get about 60k per second on a 
> single flume instance with the above mentioned tuning.  I decided to 
> crank everything up (double it, even tried then doubling that once 
> more).  This machine has 48gb and is doing nothing but this.  So 
> logically, I figured I could bump my OPTS to 10g instead of 5, and up 
> my channel capacity to 8g.  Allowing me to buffer more and in theory 
> double my throughput.  This wasn't at all the case, by attempting to 
> throw more at it (either by lowering my sleep times between batches, 
> or even using the same sleep times but double my batch size from 
> 10lines to 20, things started flaking out).  Basically, after about 
> 200k lines went through, it just stopped processing, no warnings, 
> errors nothing.
> Here's where it gets interesting though.  I then setup four flume 
> agents on the same machine with all the same configurations and 
> startup params back at the 4g range, all listening on different ports. 
>  I started all 4, and then in parallel (on another machine), ran my 
> test script to hit all four agents.  That's when I was able to get 
> 200k through.  So by running four of them with lower tunables, I was 
> able to get the throughput I couldn't get running one with 4x the 
> tunables and startup options.
>
The channel capacity really shouldn't matter so long as it is large 
enough to hold stuff in the interim until the sink drains it.
If you want to use one big agent, you may need to tune the sink 
runners(which are single threaded). E.g. if you have a lot of data 
coming in and one avro sink that just can't keep up, you can set up 
multiple avro sinks. The channel size setting should be made to be large 
enough to hold whatever amount of data you expect to build up if a 
downstream server/write location becomes unavailable.
>
> Number 2 is something i can easily live with but would like to hear 
> some insight on maybe what's causing it.  Obviously the disks can keep 
> up because all of the file_roll paths for all 4 agents are using the 
> same drive.  And obviously I have the ram to buffer accordingly.  But 
> for some reason, one agent with 2x or even 4x the juice starts getting 
> flaky.
>
> Number 1 is more concerning, this obviously will need to be solved.
The 2 rules I stick to for a lossless flow(your current system is 
unfortunately breaking both):
1) ingest system using an rpc delivery that is aware of failed sends and 
responds by resending data(we have a python program that tails files 
sending to ScribeSource, keeping a position pointer, and rewinds that 
pointer when the thrift rpc responds with failure).
2) Lossless channel(currently file or jdbc). This is generally only an 
issue for restarts/failures.

>
> In summary, I'm willing and ready to spend more time on this.  But 
> wanted to get some insight from the pros, developers here and also 
> make sure I'm not crazy and maybe just trying to use this for more 
> than it was designed.
>
> Many thanks for anyone that stuck around to read this! :)
Thanks for your feedback!

Best of luck,
  Juhani
>
>
>  Cheers
>
> -- 
> Steve Johnson
> steve@webninja.com <ma...@webninja.com>