You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Binglin Chang (JIRA)" <ji...@apache.org> on 2011/08/13 09:30:27 UTC

[jira] [Created] (MAPREDUCE-2841) Task level native optimization

Task level native optimization
------------------------------

                 Key: MAPREDUCE-2841
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
             Project: Hadoop Map/Reduce
          Issue Type: Improvement
          Components: task
         Environment: x86-64 Linux
            Reporter: Binglin Chang


I'm recently working on native optimization for MapTask based on JNI. 

The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:

1. Sort is about 3x-10x as fast as java(only binary string compare is supported)

2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).

3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill

This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.

There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 

Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.

This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 






--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092524#comment-13092524 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

bq. Can you summarize how the memory management works in the current patch?
KeyValue Buffer memory management in the current patch is very simple, it has three parts:

MemoryPool
Hold the buffer of size io.sort.mb, and track current buffer usage
notice that this buffer will only occupy virtual memory not RSS(memory really used) if the memory is not 
actually accessed, this is better than java because java initialize arrays.
Memory lazy allocation is a beautiful feature :)

MemoryBlock 
Small chunk of memory block backed by MemoryPool, used by PartitionBucket
the default size of MemoryBlock = ceil(io.sort.mb / partition / 4 / MIN_BLOCK_SIZE) / MIN_BLOCK_SIZE
currently MIN_BLOCK_SIZE == 32K, it should be dynamically tuned according to partition number & io.sort.mb
The purpose of MemoryBlock is to reduce CPU cache miss. When sorting large indirect addressed KV pairs, 
I guess the sort time will be dominated by RAM random reads, so MemoryBlock is used to let each bucket 
get relatively continous memory.

PartitionBucket 
Store KV pairs for a partition, it has two arrays:
  vector<MemoryBlock *> blocks
    blocks used by this bucket
  vector<uint32_t> offsets 
    KV pair start offset in MemoryPool
    this vector is not under memory control(in io.sort.mb) yet, a bug needs to be fixed
      (use memory of MemoryPool, use MemoryBlock directly or move backward from buffer end)
    it uses less memory(1/3) than java kvindices, and use 1/2 of io.sort.mb memory at 
    most (when all k/v are empty), so it won't be much problem currently

Limitations of this approach:
Large partition number leads to small MemoryBlock
Large Key/Value can cause memory holes in small MemoryBlock

It's difficult to determine block size, since it relates to K/V size(like the old io.sort.record.percent),
200MB memory can only hold 12800 16K MemoryBlocks, so if average K/V size is a little bigger than 8K, 
half of the memory will likely be wasted.
This approach will not work well when partition number & Key/Value size is large, but this is rare case,
and it can be improved, just for example, we can use MemoryPool directly (disable MemoryBlock) if 
io.sort.mb/partiion number is too small

The other thing related to this is this approach only support simple synchronized collect/spill, I think this
will not harm performance very much. 
Asynchronized collect/spill needs tuning of io.sort.spill.percent, and we can make sort&spill really fast so 
parallel collect & spill is not so important as before, we can also let the original mapper thread to do sort&spill 
by enabling parallel sort&spill.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092972#comment-13092972 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

bq. It might make sense to commit that subset as optional functionality first, then iterate based on feedback.
I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask?
It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK.

Random memory config gives the Resource Scheduler more information so it may yield better schedule algorithms. As for OOM, there is a flex layer for memory control already, page cache. In typical slave node memory configuration and real cases, page cache (should) take considerable proportions of total memory(20%-50%), so for example tasks can be configured to use 60% of memory, but can have some variance in 20% range, and the variance become relatively small when multiple tasks combined to node level or whole job level.
One of my colleague is working on shuffle service, which delegate all reduce shuffle work to a per node service, this has some aspect which is similar:
For a single task, the variance of memory footprint is a problem, but it gets much stable for many tasks run on a node.


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (MAPREDUCE-2841) Task level native optimization

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated MAPREDUCE-2841:
-------------------------------------

    Attachment: dualpivotv20-0.patch
                dualpivot-0.patch

bq. Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort)

As Binglin points out, that's easy to test given a rote translation. I ported the code here mechanically:

http://permalink.gmane.org/gmane.comp.java.openjdk.core-libs.devel/2628

----

The goal of the last few implementations was a predictable memory footprint. Per-partition buckets are obviously faster to sort, but record skew and internal fragmentation in the tracking structures (and overhead in Java objects) motivated packing records into a single buffer. Can you summarize how the memory management works in the current patch?

It's exciting to see such a significant gain, here.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093534#comment-13093534 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

Update some test results.

1. Terasort 10G input 40map 40reduce on 9node cluster, 7map/7reduce slot per node
   io.sort.mb 500MB
   Results on jobhistory:
   ||        ||   Total ||  AverageMap || AverageShuffle || AverageReduce
   ||java     |   54s   |  14s  |         14s   |        10s |
   ||native   |   39s   |   7s   |        15s    |       9s|
   ||java-snappy |36s   |  15s      |     9s        |    8s|
   ||native-snappy|27s  |  7s |  7s | 8s |
   speedup-without-compression: 1.38
   speedup-with-compression: 1.33
   
2. I did another test of big data set
   Terasort 100G 400map 400reduce on 9node cluster, 7map/7reduce slot per node
   ||        ||   Total ||  AverageMap || AverageShuffle || AverageReduce
   ||java-snappy  | 277s  |  17s    |  28s  | 10s |
   ||native-snappy| 234s  |  10s    |  22s  | 10s |
   speedup: 1.18
   When cluster is under heavy workload, the bottleneck will be shown in page cache, shuffle, so optimizations in sort&spill do not play big roles.

3. I test the dual pivot quicksort patch provided by Chris, using the same test as test No.1
   There are no observable differences compare to old QuickSort,  Average map task time for java-snappy is the same as before(15s), perhaps the data set is too small, or the bottleneck is dominated by other factors, like memory random access.

 




> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092468#comment-13092468 ] 

Hadoop QA commented on MAPREDUCE-2841:
--------------------------------------

-1 overall.  Here are the results of testing the latest attachment 
  http://issues.apache.org/jira/secure/attachment/12491995/dualpivotv20-0.patch
  against trunk revision .

    +1 @author.  The patch does not contain any @author tags.

    +1 tests included.  The patch appears to include 3 new or modified tests.

    -1 patch.  The patch command could not apply the patch.

Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/550//console

This message is automatically generated.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Allen Wittenauer (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093034#comment-13093034 ] 

Allen Wittenauer commented on MAPREDUCE-2841:
---------------------------------------------

Sure are a lot of header files with full blown functions in them....

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "He Yongqiang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093007#comment-13093007 ] 

He Yongqiang commented on MAPREDUCE-2841:
-----------------------------------------

we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc).

The main problem we are interested is how big is the memory problem for the java impl. 

Also it will be very useful here to define an open benchmark.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092303#comment-13092303 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

Hi, Scott
bq. there is quite a bit of juice to squeeze from the Java implementation.
I agree with that

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Binglin Chang updated MAPREDUCE-2841:
-------------------------------------

    Attachment: MAPREDUCE-2841.v1.patch

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Binglin Chang updated MAPREDUCE-2841:
-------------------------------------

    Attachment: DESIGN.html

I write a design document for this work, to better describe my thoughts. The document also include some test results. 
Generally, NativeTask outperforms original MapReduce framework, about 3x-7x 
for map task, 1x-1.1x for reduce task, 1.5x-5x for whole job. If the compiler 
hypothesis is correct, the speedup could be 4.5x-12x for map task, and the 
speedup for whole job should be larger correspondingly.

                
> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: DESIGN.html, MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Hadoop QA (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13198900#comment-13198900 ] 

Hadoop QA commented on MAPREDUCE-2841:
--------------------------------------

-1 overall.  Here are the results of testing the latest attachment 
  http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch
  against trunk revision .

    +1 @author.  The patch does not contain any @author tags.

    +1 tests included.  The patch appears to include 5 new or modified tests.

    -1 patch.  The patch command could not apply the patch.

Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/1749//console

This message is automatically generated.
                
> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: DESIGN.html, MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Dong Yang (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13176107#comment-13176107 ] 

Dong Yang commented on MAPREDUCE-2841:
--------------------------------------

Beautiful works beyond HCE! Contrib to binglin~
                
> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Scott Carey (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092160#comment-13092160 ] 

Scott Carey commented on MAPREDUCE-2841:
----------------------------------------

I think there is quite a bit of juice to squeeze from the Java implementation.  For example, Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort).  Java can (and likely will in the 0.23 branch) also benefit from hardware CRC. After that, I wonder how much faster a native implementation would actually be.



> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Binglin Chang updated MAPREDUCE-2841:
-------------------------------------

    Status: Patch Available  (was: Open)


I just submit a demo patch to show basic ideas and current progress. 
This patch contains map task optimization using NativeMapOutputCollector.

NativeMapOutputCollector use JNI to pass java k/v pairs and 
partition value to native side. As JNI has un-neglectable 
overhead(about 1000ns per empty JNI call wit 4-6 arguments), 
a k/v cache is added to pass k/v pairs to native side in batch. 
I suspect using a DirectBuffer will be better.

On the native side, k/v pairs are put into partitioned buffers,
this is different from java's single buffer approach. By doing so, 
sort can be much faster, because sort a big array is much slower
than sort many small arrays; small array also means less cache 
miss; and partition number does not needed to be compared 
in sort. 

Two light weighted io buffers: ReadBuffer & AppendBuffer, 
which has better performance than decorator based 
java & hadoop io streams. This greatly benefits IFile 
serialization, many method can be inlined. Currently 
compression is not supported yet, but it should not be a 
complex work to add block based compressor like snappy or 
quicklz.

This optimization is ONLY targeted for x86-64, little-endian
and assumes unaligned 64-bit loads and stores are cheap, 
like google-snappy.

About the patch:
I put cpp code into src/c++/libnativetask, and use a Makefile
separate from build.xml and src/native, because many things are 
not decided yet, so I don't want to mess up other components. 
# cd src/c++/libnativetask
# make
# copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/
# set mapred.native.map.output.collector to true in jobconf

Again, this patch is just a demo, it is far from stable & complete, 
and has many known and unknown bugs.


Here is some test results:
1. running single task
   intel corei5 jdk6u24 on macbook pro
   input: 
     size  250000000 bytes 
     100 bytes per line
     [key 9bytes]\t[value 89bytes]\n
   KeyValueTextInputFormat
   IdentityMapper
   partition number 100
   io.sort.mb 400MB no mid-spill
   Total time and analysis:
   || ||JAVA||Native||Speedup||
   |Sort|5.27|0.8|6.5|
   |IFile SER|2.8|0.9|3.1|
   |Total|14.13|6.34|2.2|

2. Some result about partitioned sort & spill
   Input is the same as test 1
   ||partition |   1  |  10  |  20  |  40  |  80  | 200  | 400  | 800  | 1600 | 2000 |
   ||sort time | 2.21 | 1.47 | 1.27 | 1.11 | 0.9  | 0.71 | 0.65 | 0.58 | 0.51 | 0.5  |
   ||spill time| 0.49 | 0.43 | 0.41 | 0.39 | 0.36 | 0.33 | 0.32 | 0.32 | 0.29 | 0.28 |
   As is illustrated, partition number has great impact on sort & spill performance,
   This is largely because sort complexity and CPU cache effect.
   If partition number is P, recored count is N, we get:
   T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters.

3. Terasort 10G input 40map 40reduce on 9node cluster
   io.sort.mb 500MB
   Results on jobhistory:
   ||        ||   Total ||  AverageMap || AverageShuffle || AverageReduce
   ||java     |   54s   |  14s  |         14s   |        10s |
   ||native   |   39s   |   7s   |        15s    |       9s|
   ||java-lzo |   43s   |  15s      |     8s        |    8s|
   ||native-lzo|  ? | | | |
   speedup: 1.38
   Map output compression can reduce shuffle time significantly,
   so with better compression speed & less shuffle time impact,
   We can get better speedup when map output compression is finished. 
   I don't have large clusters to do more standard tests.
   

Current progress:
* NativeMapOutputCollector jni [done]
* io buffers [done]
* crc32/crc32c [done]
* integration of jobconfs [done]
* ifile writer [done]
* write ifile index file [done]
* ifile index reader [done]
* ifile reader [done]
* single-pass merge [done]
* handle big key/value
  In current implementation, if key/value length exceeds
  buffer length, there will be crash, I will add big 
  k/v handle soon.
* support block compression snappy
* multi-pass merge
* local io counters
* map-side combine
* parallel spill & merge
  Java has a spill thread to do collect/spill
  concurrently. Currently NativeMapOutputCollelctor only
  use one thread, but parallel sort&spill&merge is possible,
  since k/v pairs are partitioned now. This can further 
  speed up sort&spill&merge by simply adding threads.
* reduce task optimization
* support no sort
* support grouping 
* support pipes API, streaming


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Todd Lipcon (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093777#comment-13093777 ] 

Todd Lipcon commented on MAPREDUCE-2841:
----------------------------------------

One interesting C++ vs Java factor here is that this hot code is running in short-lived map tasks, the majority of the time. So there is less time for the JIT to kick in and actually compile the sorting code. In the past I've added -Xprof to mapred.child.java.opts, or looked at oprofile-jit output, and seen a fair amount of time spent in interpreted code.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Binglin Chang reassigned MAPREDUCE-2841:
----------------------------------------

    Assignee: Binglin Chang

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "He Yongqiang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093174#comment-13093174 ] 

He Yongqiang commented on MAPREDUCE-2841:
-----------------------------------------

bq. The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers).

If the java impl use the similar impl as the c++ one here, the only difference will be language. right? Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there. From the sort part is that it save the number of comparison since the original sort will need to compared records from difference reducers. And the c++ impl has trick of doing prefix comparison which reduces the number of cpu ops (8 bytes compare -> one long cmp op).

bq. Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort.Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times.

I totally agree the spill will be the dominate factor if it is there. So here comes the problem that how much more memory the java impl will need compared to the c++ one. 20% or 50% or 100%? so we can calculate the chance of avoidable spilling if using the c++ impl.
(Note: based on our analysis on jobs running during the past one month, most jobs need to shuffle less than 700MB data per mapper.)


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13086977#comment-13086977 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

This patch is for 0.20 branch


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093507#comment-13093507 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

Hi, Yongqiang
bq. i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl.
I think a pure java re-impl is possible, there are some tricks to slove memory fragmentation issues, maybe not throughly, for example letting many adjacent buckets share one MemoryBlock if partition number is too large, which is what I will do in native implementation. And again, it's hard to support stream like key/value serialization semantics, so the java re-impl has the same limitations as native impl has. 
But the low level unaligned memcmp & memcpy is hard to implement in java.


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093121#comment-13093121 ] 

Chris Douglas commented on MAPREDUCE-2841:
------------------------------------------

{quote}I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask?
It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK.{quote}

To pair the java/c++ code, a contrib module could make sense. Client tools and dev libraries are distant goals, though.

Contributing it to the 0.20 branch is admissible, but suboptimal. Most of the releases generated for that series are sustaining releases. While it's possible to propose a new release branch with these improvements, releasing it would be difficult. Targeting trunk would be the best approach, if you can port your code.

{quote}we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc).

The main problem we are interested is how big is the memory problem for the java impl.{quote}

Memory _is_ the problem. The bucketed sort used from 0.10(?) to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers). Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort. Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times. Merging from fewer files also decreases the chance of failure and reduces seeks across all drives (by spreading output over fewer disks). A precise memory footprint also helped application authors calculate the framework overhead (both memory and number of spills) from the map output size without considering the number of reducers.

That said, jobs matching particular profiles admit far more aggressive optimization, particularly if some of the use cases are ignored. Records larger than the sort buffer, user-defined comparators (particularly on deserialized objects), the combiner, and the intermediate data format restrict the solution space and complicate implementations. There's certainly fat to be trimmed from the general implementation, but restricting the problem will admit far more streamlined solutions than identifying and branching on all the special cases.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13094154#comment-13094154 ] 

Chris Douglas commented on MAPREDUCE-2841:
------------------------------------------

bq. we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go.

Sorry, that's what I was trying to answer. A system matching your description existed in 0.16 and tests of the current collector show it to be faster for non-degenerate cases and far more predictable. The bucketed model inherently has some internal fragmentation which can only be eliminated by using expensive buffer copies and compactions or by using per-record byte arrays, where the 8 byte object overhead exceeds the cost of tracking the partition, requiring only 4 bytes. Eliminating that overhead is impractical, but even mitigating it (e.g. allowing partitions to share slabs) requires that one implement an allocation and memory management system across Java byte arrays or ByteBuffers, themselves allocated by the JVM. I would expect that system to be easier to write and maintain than even the current impl, but not trivial if it supports all of the existing use cases and semantics. Unlike the C++ impl (and like the current one), abstractions will likely be sacrificed to avoid the overheads.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "He Yongqiang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093324#comment-13093324 ] 

He Yongqiang commented on MAPREDUCE-2841:
-----------------------------------------

sorry, i am kind of confused. i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go. 

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092322#comment-13092322 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

Has someone already done a benchmark of hadoop running on java 7 vs java 6, and share some results? 
I'm afraid I don't have enough resource to do standard benchmark, I can do some simple tests but may not convincing.

Hadoop uses it's own QuickSort & HeapSort implementation and interface, if dual pivot quicksort & Timsort is much faster, I think we should do some test, and add it to hadoop(this does not require java7).

The current implementation is very naive, and has a long way to be further optimized. For example, sort just use std::sort. 

The (very)long term goal for this work, is to provide a independent task-level native runtime and API. Users can use native api to develop applications, but java application also get part of the performance benefits. It opens up further optimization possibilities, both in framework and application layer. 




> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Scott Carey (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13094214#comment-13094214 ] 

Scott Carey commented on MAPREDUCE-2841:
----------------------------------------

{quote}
There are no observable differences compare to old QuickSort
{quote}

Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort.  However, it should have fewer swaps (0.80 times as many).  If the cost of comparison is high (larger records, object comparison) the effect will be minimal.  If the cost of comparison is low (values, very simple objects) the performance difference can be larger, up to about 2.5x as fast for sorting an array of ints. 

http://mail.openjdk.java.net/pipermail/core-libs-dev/2010-August/004687.html  (initial message, many more if you search).

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Binglin Chang updated MAPREDUCE-2841:
-------------------------------------

    Attachment: MAPREDUCE-2841.v2.patch

I just attach a updated patch, in case if anybody has interest. Changes:
# Separate key type & value type, theoretically value can be any type.
# Add google-snappy compression
# iobuffer don't use templates anymore 


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093501#comment-13093501 ] 

Hadoop QA commented on MAPREDUCE-2841:
--------------------------------------

-1 overall.  Here are the results of testing the latest attachment 
  http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch
  against trunk revision .

    +1 @author.  The patch does not contain any @author tags.

    +1 tests included.  The patch appears to include 5 new or modified tests.

    -1 patch.  The patch command could not apply the patch.

Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/563//console

This message is automatically generated.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "He Yongqiang (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13199485#comment-13199485 ] 

He Yongqiang commented on MAPREDUCE-2841:
-----------------------------------------

Really cool stuff! 
In terms of CPU performance, can you also do some comparison with new java output collector that Todd mentioned (https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java)? In facebook's internal tests, we have seen a big improvement(8x-10x) for cpu spent in sort (e.g., example 1: 
240M->30M, example 2: 9M->1.8M), and total mapper CPU of 2x ( example 1: 707M -> 440M, example 2: 40M->19M). They are CPU numbers, not latency numbers. BlockMapOutputBuffer.java uses only one thread but the original collector uses 2 threads. But the latency is still improved by a lot (like 30%). 

with some analysis on performance differences, it will really help understand some bottlenecks and the difference that language brings.


                
> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: DESIGN.html, MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Todd Lipcon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13169992#comment-13169992 ] 

Todd Lipcon commented on MAPREDUCE-2841:
----------------------------------------

I chatted recently with the MR guys over at Facebook. They have another implementation here which they've been working on that gives similar gains, while staying all in java. The approach is something like the following:
- map output collector collects into small buffers, each sized something close to L3 cache
- when any buffer is full, sort it but don't spill it
- when enough buffers are collected to fill io.sort.mb, merge them from memory to disk

This fixes the cache locality issues that everyone has identified, but doesn't require native code. It's up on their github here: https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java

Maybe Yongqiang can comment more on this approach? Dmytro has given permission offline for us to work from the code on their github and contribute it on trunk (they may not have time to contribute it in the nearterm)

I think a short term goal for this area we could attack would be to make the map output collector implementation pluggable. Then people can experiment more freely with different collector implementations. I don't have time for it - just throwing it out there as a thought.
                
> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "MengWang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13091491#comment-13091491 ] 

MengWang commented on MAPREDUCE-2841:
-------------------------------------

Good job! 

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092596#comment-13092596 ] 

Chris Douglas commented on MAPREDUCE-2841:
------------------------------------------

{quote}Just an idea, what if memory related configurations can be a random variable, 
with mean & variance? Can this leads to better resource utilization? A fixed memory bound 
always means application will request more memory than they really need.
I think in many cases predictable memory control is enough, rather than precise memory control, 
since it's impractical. We can use some dynamic memory if it is in a predicable range, 
for example +/-20%, +-30%, etc.{quote}

The fixed memory bound definitely causes resource waste. Not only will users ask for more memory than they need (particularly since most applications are not tightly tuned), but in our clusters, users will just as often request far too little. Because tasks' memory management is uniformly specified within a job, there isn't even an opportunity for the framework to adapt to skew.

The random memory config is an interesting idea, but failed tasks are regrettable and expensive waste. For pipelines with SLAs, "random" failures will probably motivate users to jack up their memory requirements to match the range (which, if configurable, seems to encode the same contract). The precise specification was avoiding OOMs; because the collection is across a JNI boundary, a "relaxed" predictable memory footprint could be easier to deploy, assuming a hard limit in the native code to avoid swapping.

Thanks for the detail on the collection data structures. That makes it much easier to orient oneself in the code.

A few quick notes on your [earlier|https://issues.apache.org/jira/browse/MAPREDUCE-2841?focusedCommentId=13086973&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13086973] comment:

Adding the partition to the record, again, was to make the memory more predictable. The overhead in Java tracking thousands of per-partition buckets (many going unused) was worse than the per-record overhead, particularly in large jobs. Further, user comparators are often horribly inefficient, so the partition comparison and related hit to its performance was in the noise. The cache miss is real, but hard to reason about without leaving the JVM.

The decorator-based stream is/was? required by the serialization interface. While the current patch only supports records with a known serialized length, the contract for other types is more general. Probably too general, but users with occasional several-hundred MB records (written in chunks) exist. Supporting that in this implementation is not a critical use case, since they can just use the existing collector. Tuning this to handle memcmp types could also put the burden of user comparators on the serialization frameworks, which is probably the best strategy. Which is to say: obsoleting the existing collection framework doesn't require that this support all of its use cases, if some of those can be worked around more competently elsewhere. If its principal focus is performance, it may make sense not to support inherently slow semantics.

Which brings up a point: what is the scope of this JIRA? A full, native task runtime is a formidable job. Even if it only supported memcmp key types, no map-side combiner, no user-defined comparators, and records smaller than its intermediate buffer, such an improvement would still cover a lot of user jobs. It might make sense to commit that subset as optional functionality first, then iterate based on feedback.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13199498#comment-13199498 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

@Yongqiang
It seams BlockMapOutputBuffer only support BytesWritable currently? So Wordcount & Terasort can't run directly .  From the test result, Map Avg time and sort time, I would say sort take about 40-50% time of whole map task, because sort is CPU intensive, the CPU time should be more, about 50%-60% maybe. 
If my compiler assumption is right, total speedup for Wordcount mapper should be 10x, sort speedup should be 10x-12x, and the rest(reader, mapper, merge, spill combined) should be (10-0.5*12)/(1-0.5)=8. 
I must say using quicksort to sort small buffers fit into cache then merge them is a good idea, I should make this optimization too. 
NativeTask currently use single thread currently, but I think all partition based collector, including BlockMapOutputCollector) can take advantage of parallel sort & spill I mentioned in the design doc, this needs code changes to other part(TaskTracker,IndexCache maybe), and change map output file to a directory.


                
> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: DESIGN.html, MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13169965#comment-13169965 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

bq. Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many). 
Unfortunately, the main overhead of current sort in Hadoop comes from comparison, the swaps just swap two integer index, I thinks that's why Dual-pivot quicksort don't show any improvements. As Todd's results in MAPREDUCE-3235 and I experienced in this issue, the main overhead for the current Hadoop sort implementation is cache miss, nearly all comparison operations cause 2 random memory access in a huge memory area(typically X00MB). 
So it's not language differentes, just implementation differentes, to get better performance, we can:
Add index in MAPREDUCE-3235, or use partition bucket based sort.

I port DualPivotQuickSort java code to C++ and tested it on my intel i5 macbookpro, with terasort 10bytes key type and word key type in RandomTextWriter.

TeraSort input data 50MB 500000 key/value pair
11/12/15 12:18:16 INFO qsort time: 0.23108s
11/12/15 12:18:16 INFO std::sort time: 0.18266s
11/12/15 12:18:17 INFO DualPivotQuicksort time: 0.17167s

About 6% faster, I think sorting an array of ints can get much better results, cause compare two inplace ints is much faster than campare two indexed binary string.

Some updates about my work, I almost finished whole native mapTask, and part of native reduce task.
As for native MapTask with C++ RecordReader, Mapper, Partitioner, MapOutputCollector, a native MapTask now can process 250MB(47MB compressed) terasort input data in just 1.6s, comparing this with the earlier test results(14s for java, 7s for java with NativeMapOutputCollector), it is a huge speed up, and it can be further optimized.


                
> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, MAPREDUCE-2841.v2.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13092522#comment-13092522 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

bq. The goal of the last few implementations was a predictable memory footprint. 

Predictable memory foot print is very important, I strongly agree. 
Observations of our cluster's memory status shows that most OOM or high memory consumption
cases are caused by big Key/Value in InputReader, Key/Value writable and merge. Sort&spill
is much more stable. 
I think in many cases predictable memory control is enough, rather than precise memory control, 
since it's impractical. We can use some dynamic memory if it is in a predicable range, 
for example +/-20%, +-30%, etc.
Just an idea, what if memory related configurations can be a random variable, 
with mean & variance? Can this leads to better resource utilization? A fixed memory bound 
always means application will request more memory than they really need.


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Binglin Chang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13086978#comment-13086978 ] 

Binglin Chang commented on MAPREDUCE-2841:
------------------------------------------

I think this work can help improving hadoop in many ways:
#  Reduce total resource consumption, mainly CPU
#  Speed up job execution, better response time
#  More precise memory control, important feature in production system
#  More programming interface, c/cpp, python, etc.
#  Opens up further optimization possibility


> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (MAPREDUCE-2841) Task level native optimization

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13093291#comment-13093291 ] 

Chris Douglas commented on MAPREDUCE-2841:
------------------------------------------

bq. If the java impl use the similar impl as the c++ one here, the only difference will be language. right?

Yes, but the language difference includes other overheads (more below).

bq. Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there.

Each partition bucket maintins its own memory buffer, so the memory consumed by the collection framework includes the unused space in all the partition buffers. I'm calling that, possibly imprecisely, internal fragmentation. The {{RawComparator}} interface also requires that keys be contiguous, introducing other "waste" if the partition's collection buffer were not copied whenever it is expanded (as in 0.16; the expansion/copying overhead also harms performance and makes memory usage hard to predict because both src and dst buffers exist simultaneously), i.e. a key partially serialized at the end of a slab must be realigned in a new slab. This happens at the end of the circular buffer in the current implementation, but would happen on the boundary of every partition collector chunk.

That internal fragmentation creates unused buffer space that "prematurely" triggers a spill to reclaim the memory. Allocating smaller slabs decreases internal fragmentation, but also adds an ~8 byte object tracking overhead and GC cycles. In contrast, large allocations (like the single collection buffer) are placed directly in permgen. The 4 byte overhead per record to track the partition is a space savings over slabs exactly matching each record size, requiring at least 8 bytes per record if naively implemented.

The current implementation is oriented toward stuffing the most records into a precisely fixed amount of memory, and adopts a few assumptions: 1) one should spill as little as possible 2) if spilling is required, at least don't block the mapper 3) packing the most records into each spill favors MapTasks with combiners. If there are cases (we all acknowledge that there are) where spilling more often but _faster_ can compensate for that difference, then it's worth reexamining those assumptions.

> Task level native optimization
> ------------------------------
>
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch, dualpivot-0.patch, dualpivotv20-0.patch
>
>
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too. 

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira