You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Scott Carey <sc...@richrelevance.com> on 2009/08/01 03:07:10 UTC

Re: Map performance with custom binary format

On 7/30/09 2:32 PM, "william kinney" <wi...@gmail.com> wrote:
>
> "If even a single task on a single large file is slower in MB/sec than your
> test program, then I suspect read/write buffer issues or misuse somewhere."
>   - Do you know of an instance where I'd have buffer issues with the
> Child process, and not local? The only difference I can think of is of
> course how the buffer is filled, FileInputStream vs FSDataInputStream.
> But once it is filled, why would reading portions of that buffer (ie,
> Arrays.copyOfRange()) take long in one instance but not another?

Is your local test writing to HDFS or local files?  Likewise, what happens
if your local test reads from HDFS instead of a local file?

If both tests use HDFS on both ends and the performance difference is still
there, then we have narrowed it down.  If the performance becomes much more
similar, we have likewise narrowed it down.

>
> Would it be helpful to get a histogram of the Arrays.copyOfRange(),
> rather than the average and total? Perhaps for the most part it is
> fine (~ 120 ns), but chokes sometimes (thefore increasing total time
> and average).
>
> Thanks for the help,
> Will
>
>
> On Thu, Jul 30, 2009 at 2:19 PM, Scott Carey<sc...@richrelevance.com> wrote:
>> Comments inline:
>>
>> On 7/30/09 7:37 AM, "william kinney" <wi...@gmail.com> wrote:
>>
>>> Local is executed on a Hadoop node (when no job is running), So same
>>> JRE/hardware.
>>>
>>> JRE:
>>> java version "1.6.0_13"
>>> Java(TM) SE Runtime Environment (build 1.6.0_13-b03)
>>> Java HotSpot(TM) 64-Bit Server VM (build 11.3-b02, mixed mode)
>>>
>>> JVM arguments for child task:
>>> /usr/java/jdk1.6.0_13/jre/bin/java
>>> -Djava.library.path=/usr/lib/hadoop/lib/native/Linux-amd64-64:/disk1/hadoop/
>>> ma
>>> pred/local/taskTracker/jobcache/job_200907242015_0048/attempt_200907242015_0
>>> 04
>>> 8_m_000008_0/work
>>> -Xmx486m
>>
>> You might benefit from JVM 1.6.0_14 with -XX:+UseCompressedOops, but that
>> would probably help both roughly equally.  It won't help much if you aren't
>> creating significant work for the garbage collector thouth.
>>
>>>
>>> Local call has no JVM arguments, just:
>>> java -cp <myjar>.jar com......RecordReaderTest <fileToTest>
>>>
>>
>> You might want to try -Xmx486m as an experiment on the local test to see if
>> it affects the behavior.  If you are doing a lot of garbage creation it may.
>>
>>
>>>
>>> Data is not compressed.
>>
>> Hmm, that was a random guess, because it would obviously affect CPU use.
>> Another thing to try -- make sure your writer that is writing into HDFS is
>> wrapped with a buffer (try 32k or 64k).  That's another random guess for
>> something that might not show up well in stack traces without a profiler --
>> but also might already be done.
>>
>>>
>>> JobTracker:
>>> Running: Started around 20, but as the job progressed it slowly
>>> increased to at the end: 432 (when Pending was 0). Running dropped to
>>> 0/Status was marked Succeeded about 10 seconds after that. Is this
>>> normal? The total # of Tasks was 1449.
>>
>> This is the "one new task per heartbeat" scheduler slowness.  The next
>> version of the Fair Scheduler will schedule many tasks in one heartbeat
>> which should make this faster.
>> Its a big reason that fewer, larger files was faster.  Though if you are CPU
>> bound, you only need 2 tasks running at the same time per node on your
>> hardware to be at near top efficiency.  Fewer tasks per node (say, 4) with
>> more RAM each (800MB) might do better on this sort of workload.
>>
>>
>>>
>>> Stack Traces.
>>> Looked at about 20 stack traces from 2 different nodes. Consistently saw:
>>> 2 x org.apache.hadoop.dfs.DFSClient$LeaseChecker @ Thread.sleep()
>>> "Comm thread for attempt_200907242015_0050_m_001409_0" @ Thread.sleep()
>>> "IPC Client (47) connection to <master-hostname>/192.168.1.100:8020
>>> from wkinney" @ Object.wait()
>>> "IPC Client (47) connection to /127.0.0.1:49202 from an unknown user"
>>> @ Object.wait()
>>> VM, GC, Signal Dispatcher, Low Memory Detector, CompilerThread,
>>> Finalizer, Reference Handler...
>>
>> Sounds like the usual threads that don't do much.
>>
>>>
>>> Then would sometimes see FastDateFormat thread, parseFrom(), or
>>> somewhere near there (e.g. MapRunner.run())
>>
>> The meat of the task.
>>
>>>
>>> Finally, I consistently saw this:
>>> "Thread-5" daemon prio=10 tid=0x0000000040bbfc00 nid=0x2f87 in
>>> Object.wait() [0x00007fb7498ce000..0x00007fb7498cebf0]
>>>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>>         at java.lang.Object.wait(Native Method)
>>>         - waiting on <0x00007fb769fdec00> (a java.util.LinkedList)
>>>         at
>>> org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.j
>>> av
>>> a:1905)
>>>         - locked <0x00007fb769fdec00> (a java.util.LinkedList)
>>> I'm guessing this is normal DataNode activity...
>>
>> Yes, this is the normal dfs thread.  It can be hard to catch it doing work
>> with just stack traces and no profiler attached.
>>
>>>
>>> Will
>>>
>>>
>>
>> There is definitely a mystery here.  I expect the task scheduling delays and
>> some startup inefficiency but the overall difference is odd.  What about a
>> local test on a single, larger file versus a hadoop job on that same single,
>> larger file (which would have just one map job)?  This test may be very
>> enlightening.
>>
>>
>>> On Thu, Jul 30, 2009 at 1:31 AM, Scott Carey<sc...@richrelevance.com> wrote:
>>>> What is the JRE for the Hadoop nodes versus local?  What are the JVM
>>>> arguments for the child tasks and the local version (and heap size)?  What
>>>> is
>>>> the hardware and platform details for the nodes versus the local test?
>>>> Is the data compressed in Hadoop (check the config)?
>>>>
>>>> You mention the TaskTracker web inerface during a job, but what about the
>>>> JobTracker interface?  This should show the global view of currently
>>>> scheduled maps versus total slots.
>>>>
>>>> Lastly, check out some more stack traces on the tasks.  If they are all
>>>> still
>>>> in the DateFormat stuff?  Surely some of them should be in your parseFrom()
>>>> method too?
>>>>
>>>>
>>>> On 7/29/09 9:07 PM, "william kinney" <wi...@gmail.com> wrote:
>>>>
>>>> OK:
>>>>  implemented some iotop/iostat monitoring in ganglia. Looks pretty
>>>> standard (job was 23:00 to 23:06):
>>>>   - Single Node Disk Read: http://imagebin.org/57716
>>>>   - Single Node Disk Write: http://imagebin.org/57717
>>>>
>>>> On each node, noticed that the two TaskTracker$Child processes were
>>>> consuming close to 90% of each core. The TaskTracker and DataNode were
>>>> close to 0%. For the TT children, I did jstack dumps, but didn't
>>>> really see much that popped out other than a lot of time spent in a
>>>> SimpleDateFormat section and the protobuf parse. I switched the SDF
>>>> out with commons.lang FastDateFormat, which reduced the total time for
>>>> both the Hadoop job and the local/non-hadoop test, so still a
>>>> discrepancy between local and hadoop runs.
>>>>
>>>> "You can look at the logs for an individual task, and see how much data it
>>>> read, and how long it took.  It might be hitting your 50MB/sec or close in
>>>> a
>>>> burst, or perhaps not."
>>>>  - I decided to log the performance of each RecordReader use within
>>>> hadoop, which is essentially 1:1 for TaskTracker$Child process since I
>>>> have 1 InputSplit per file (ie, no splitting), right?. Saw:
>>>> Example 1) 527639090 bytes in : 18050 ms. (27.8778 MB/s)
>>>> Example 2) 533770314 bytes in : 23494 ms. (21.6669 MB/s)
>>>> Example 3) 529711886 bytes in : 20092 ms. (25.1429 MB/s)
>>>> ...etc
>>>> For reference, the non-hadoop/local test:
>>>> 530710906 bytes in : 9133 ms. (55.41721 MB/s)
>>>>
>>>> Regarding the JobTracker only doing 1 task / node / 2 seconds, that
>>>> will definitely hurt. Although the above discrepancy takes priority
>>>> for me, for now.
>>>>
>>>> "What does the web interface tell you about the number of concurrent map
>>>> tasks during the run?  Does it approach the max task slots?"
>>>>  - Yeah it definitely does, from the TaskTracker page on each node,
>>>> I'm seeing almost always 2 "RUNNING" tasks (and an accumulating list
>>>> of "COMMIT_PENDING" tasks under Non-Running, which slowly grows as the
>>>> job progresses). Normal?
>>>>
>>>> Also, I used a profiler to profile a local/non-hadoop test of the
>>>> RecordReader/Map():
>>>>  class: %Time
>>>>      org.apache.commons.lang.time.FastDateFormat.format(long):  46%
>>>>      com......parseFrom(byte[]):  42%
>>>>      java.io.FileInputStream.read(byte[], int, int): 5%
>>>>      ...rest are 1%'ish
>>>>  I guess this doesn't show anything helpful. I'll try to attach it to
>>>> hadoop remotely...anyone have any experience doing this w/ YourKit
>>>> Java Profiler?
>>>>
>>>> Anyways, decided to test the "large files" vs "small files" theory again:
>>>>  Small files (1449 files, ranging 10-100MB. average: 32 MB)
>>>>    - HDFS bytes read  49,057,491,374
>>>>    - Map input records  737,850,142
>>>>    - Finished in: 7mins, 26sec
>>>>    ... 104.898 MB/s
>>>>  Large files (22 files, around 500MB. average 514MB)
>>>>    - HDFS bytes read  11,852,421,152
>>>>    - Map input records 179,657,432
>>>>    - Finished in: 1mins, 8sec
>>>>    ... 166.225 MB/s
>>>>
>>>>   Not sure why before the large files were taking longer, perhaps the
>>>> SimpleDateFormat>FastDateFormat change? Anyways, good to see where I
>>>> need to take the file sizes too...but still 166 MB is not the rate I
>>>> was hoping for (given the # of nodes and local performance).
>>>>
>>>> So I guess in summary, hadoop TaskTracker$Child processes that are
>>>> doing the Map() and RecordReader are about 50% slower than the normal,
>>>> local non-hadoop version. In addition, their rate (~25MB/s) * Num
>>>> Nodes (10) suggests ~ 250MB/s total job performance, but I'm only
>>>> seeing ~166MB/s.
>>>>
>>>> Will
>>>>
>>>> On Tue, Jul 28, 2009 at 6:35 PM, Scott Carey<sc...@richrelevance.com>
>>>> wrote:
>>>>> See below:
>>>>>
>>>>>
>>>>> On 7/28/09 12:15 PM, "william kinney" <wi...@gmail.com> wrote:
>>>>>
>>>>>> Sorry, forgot to include that detail.
>>>>>>
>>>>>> Some data from ganglia:
>>>>>>
>>>>>>   CPU:
>>>>>>     - on all 10 nodes, I am seeing for the life of the job 85-95% CPU
>>>>>> usage, with about 10% of that being "System" CPU, vs "User".
>>>>>>     - Single node graph: http://imagebin.org/57520
>>>>>>     - Cluster graph: http://imagebin.org/57523
>>>>>
>>>>> Ok, CPU is definitely loaded.  Identify which processes are primarily
>>>>> responsible (Tasks? Datanode? Tasktracker?) You'll want to make the
>>>>> processes eating CPU during a run spit out some stack traces to 'profile'
>>>>> the activity.  Use either the 'jstack' utility with the JDK, or do a 'kill
>>>>> -3 <pid>' on a java process to spit out the stack trace to stdout.  You'll
>>>>> want to do this a handful of times on a single job if possible to identify
>>>>> any trends.
>>>>>
>>>>>>
>>>>>>   Memory:
>>>>>>     - Memory used before job is about 0.4GB, During job it fluctuates
>>>>>> up to 0.6GB and 0.7GB, then back down to 0.4GB. Most of the node
>>>>>> memory (8GB) is showing as "Cached".
>>>>>>     - Single node graph: http://imagebin.org/57522
>>>>>
>>>>> So the OS is mostly just caching disk files in RAM.
>>>>>
>>>>>>
>>>>>>   Network:
>>>>>>     - IN and OUT: Each node 6-12MB/s, cumulative about 30-44MB/s.
>>>>>>     - Single node graph: http://imagebin.org/57521
>>>>>>     - Cluster graph: http://imagebin.org/57525
>>>>>>
>>>>>
>>>>> That is a not insignificant, but cumulative across the cluster its not
>>>>> much.
>>>>>
>>>>>> iostat (disk) (sampled most of the nodes, below values are ranges I saw):
>>>>>>     tps: 0.41-1.27
>>>>>>     Blk_read/s: 46-58
>>>>>>     Blk_wrtn/s: 20-23
>>>>>> (have two disks per node, both SAS, 10k RPM)
>>>>>>
>>>>>
>>>>> Did you do iostat with a parameter to have it spit out more than one row?
>>>>> By default, it spits out data averaged since boot time, like vmstat.
>>>>> My favorite iostat params for monitoring are:
>>>>> iostat -mx 5
>>>>> iostat -dmx 5
>>>>> (or 10 or 15 or 60 second intervals depending on what I'm doing)  Ganglia
>>>>> might have some I/O info -- you want both iops and some sort of bytes/sec
>>>>> measurement.
>>>>>
>>>>>> ---
>>>>>> Are those Blk_read/wrtn/s as in block size (4096?) = bytes/second?
>>>>>>
>>>>>
>>>>> I think its the 512 byte block notion, but I always use -m to put it in
>>>>> useful units.
>>>>>
>>>>>> Also, from the job page (different job, same Map method, just more
>>>>>> data...~40GB. 781 files):
>>>>>> Map input records       629,738,080
>>>>>> Map input bytes         41,538,992,880
>>>>>>
>>>>>> Anything else I can look into?
>>>>>
>>>>> Based on your other email:
>>>>>
>>>>> There are almost 800 map tasks, these seem to mostly be data local.  The
>>>>> current implementation of the JobTracker schedules rather slowly, and can
>>>>> at
>>>>> best place one new task per node per 2 seconds or so on a small cluster.
>>>>> So, with 10 servers, it will take at least 80 seconds just to schedule all
>>>>> the tasks.
>>>>> If each server can run 8 tasks concurrently, then if the average task
>>>>> doesn't take somewhat longer than 16 seconds, the system will not reach
>>>>> full
>>>>> utilization.
>>>>>
>>>>> What does the web interface tell you about the number of concurrent map
>>>>> tasks during the run?  Does it approach the max task slots?
>>>>>
>>>>> You can look at the logs for an individual task, and see how much data it
>>>>> read, and how long it took.  It might be hitting your 50MB/sec or close in
>>>>> a
>>>>> burst, or perhaps not.
>>>>>
>>>>> Given the sort of bottlenecks I often see, I suspect the scheduling.  But,
>>>>> you have almost maxed CPU use, so its probably not that.  Getting stack
>>>>> dumps to see what the processor is doing during your test will help narrow
>>>>> it down.
>>>>>
>>>>>
>>>>>>
>>>>>> Do my original numbers (only 2x performance) jump out at you as being
>>>>>> way off? Or it is common to see that a setup similar to mine?
>>>>>>
>>>>>> I should also note that given its a custom binary format, I do not
>>>>>> support Splitting (isSplittable() is false). I don't think that would
>>>>>> count for such a large discrepancy in expected performance, would it?
>>>>>>
>>>>>
>>>>> If the files are all larger than the block size, it would cause a lot more
>>>>> network activity -- but unless your switch or network is broken or not
>>>>> gigabit -- there is a lot of capacity left in the network.
>>>>>
>>>>>> Thanks for the help,
>>>>>> Will
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 28, 2009 at 12:58 PM, Scott Carey<sc...@richrelevance.com>
>>>>>> wrote:
>>>>>>> Well, the first thing to do in any performance bottleneck investigation
>>>>>>> is
>>>>>>> to look at the machine hardware resource usage.
>>>>>>>
>>>>>>> During your test, what is the CPU use and disk usage?  What about
>>>>>>> network
>>>>>>> utilization?
>>>>>>> Top, vmstat, iostat, and some network usage monitoring would be useful.
>>>>>>>  It
>>>>>>> could be many things causing your lack of scalability, but without
>>>>>>> actually
>>>>>>> monitoring your machines to see if there is an obvious bottleneck its
>>>>>>> just
>>>>>>> random guessing and hunches.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 7/28/09 8:18 AM, "william kinney" <wi...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks in advance for the help!
>>>>>>>>
>>>>>>>> I have a performance question relating to how fast I can expect Hadoop
>>>>>>>> to scale. Running Cloudera's 0.18.3-10.
>>>>>>>>
>>>>>>>> I have custom binary format, which is just Google Protocol Buffer
>>>>>>>> (protobuf) serialized data:
>>>>>>>>
>>>>>>>>   669 files, ~30GB total size (ranging 10MB to 100MB each).
>>>>>>>>   128MB block size.
>>>>>>>>   10 Hadoop Nodes.
>>>>>>>>
>>>>>>>> I tested my InputFormat and RecordReader for my input format, and it
>>>>>>>> showed about 56MB/s performance (single thread, no hadoop, passed in
>>>>>>>> test file via FileInputFormat instead of FSDataInputStream) on
>>>>>>>> hardware similar to what I have in my cluster.
>>>>>>>> I also then tested some simple Map logic along w/ the above, and got
>>>>>>>> around 54MB/s. I believe that difference can be accounted for parsing
>>>>>>>> the protobuf data into java objects.
>>>>>>>>
>>>>>>>> Anyways, when I put this logic into a job that has
>>>>>>>>   - no reduce (.setNumReduceTasks(0);)
>>>>>>>>   - no emit
>>>>>>>>   - just protobuf parsing calls (like above)
>>>>>>>>
>>>>>>>> I get a finish time of 10mins, 25sec, which is about 106.24 MB/s.
>>>>>>>>
>>>>>>>> So my question, why is the rate only 2x what I see on a single thread,
>>>>>>>> non-hadoop test? Would it not be:
>>>>>>>>   54MB/s x 10 (Num Nodes) - small hadoop overhead ?
>>>>>>>>
>>>>>>>> Is there any area of my configuration I should look into for tuning?
>>>>>>>>
>>>>>>>> Anyway I could get more accurate performance monitoring of my job?
>>>>>>>>
>>>>>>>> On a side note, I tried the same job after combining the files into
>>>>>>>> about 11 files (still 30GB in size), and actually saw a decrease in
>>>>>>>> performance (~90MB/s).
>>>>>>>>
>>>>>>>> Any help is appreciated. Thanks!
>>>>>>>>
>>>>>>>> Will
>>>>>>>>
>>>>>>>> some hadoop-site.xml values:
>>>>>>>> dfs.replication  3
>>>>>>>> io.file.buffer.size   65536
>>>>>>>> dfs.datanode.handler.count  3
>>>>>>>> mapred.tasktracker.map.tasks.maximum  6
>>>>>>>> dfs.namenode.handler.count  5
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>