You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Robert Schmidtke (JIRA)" <ji...@apache.org> on 2017/08/03 07:13:00 UTC

[jira] [Comment Edited] (MAPREDUCE-6923) YARN Shuffle I/O for small partitions

    [ https://issues.apache.org/jira/browse/MAPREDUCE-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112313#comment-16112313 ] 

Robert Schmidtke edited comment on MAPREDUCE-6923 at 8/3/17 7:12 AM:
---------------------------------------------------------------------

Hi Ravi,

my guess is that since {{trans}} is a {{long}}, and {{ByteBuffer.allocate(...)}} only takes {{ints}}, a "blind" cast in the {{Math.min(...)}} operation might yield a negative value for {{trans > Integer.MAX_VALUE}}:

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer
                .allocate(Math.min(shuffleBufferSize, (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

gives

{code:java}
Exception in thread "main" java.lang.IllegalArgumentException
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
	at test.Test.main(Test.java:12)
{code}

whereas

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(shuffleBufferSize,
                trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

correctly outputs {{131072}}.

As for the other 18% issue, I am not yet quite sure. I'm currently investigating the I/O of each of Hadoop's components, using TeraSort as my working horse. For a TeraSort of 1024 GiB, YARN reads a total of 1169 GiB in my setup, with {{transferTo.allowed=true}}. Taking into account that the MapReduce framework counters report 1065 GiB of serialized map output (and thus, 1065 GiB of shuffled bytes), the overhead is "only" 104 GiB for 1024 GiB input, or roughly 10%. So there are additional reads, even when using {{transferTo}}. Maybe it has something to do with resource distribution? Note that I have disabled speculative execution, so there are no extra executions of additional reducers, which might read the same map output multiple times. However, there are 140 "Failed Shuffles" -- does that mean that they have been executed again? If so, and assuming that for 1024 GiB of input, each reducer needs to fetch {{1065 / 2048 = 0.52 GiB}}, there is an additional overhead of {{2048 * 0.52 = 73 GiB}}. What remains is an unexplained 31 GiB.

When running TeraSort with {{transferTo.allowed=false}} and my patch as described above, sorting 256 GiB, the MapReduce framework counters report 266 GiB of serialized map output (and thus, 266 GiB of shuffled bytes). In this run, there were no "Failed Shuffles". Since my analysis reports that YARN reads 300 GiB, the overhead is actually probably more correctly measured as 34 GiB (= 13% of 256 GiB) instead of 45 GiB (= 18% of 256 GiB). These 34 GiB are close enough to the 31 GiB for 1024 GiB input (see above), so maybe this is constant overhead for 2048 mappers and 2048 reducers?

Anyway, since I'll be investigating this behavior in the future, digging into per-file statistics, I'll be able to report exactly which file is read how often / how much of it is read. I can then tell exactly what is happening on disk. Since this is part of unpublished research, however, I'm afraid I can only report the results later.


was (Author: rosch):
Hi Ravi,

my guess is that since {{trans}} is a {{long}}, and {{ByteBuffer.allocate(...)}} only takes {{int}}s, a "blind" cast in the {{Math.min(...)}} operation might yield a negative value for {{trans > Integer.MAX_VALUE}}:

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer
                .allocate(Math.min(shuffleBufferSize, (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

gives

{code:java}
Exception in thread "main" java.lang.IllegalArgumentException
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
	at test.Test.main(Test.java:12)
{code}

whereas

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(shuffleBufferSize,
                trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

correctly outputs {{131072}}.

As for the other 18% issue, I am not yet quite sure. I'm currently investigating the I/O of each of Hadoop's components, using TeraSort as my working horse. For a TeraSort of 1024 GiB, YARN reads a total of 1169 GiB in my setup, with {{transferTo.allowed=true}}. Taking into account that the MapReduce framework counters report 1065 GiB of serialized map output (and thus, 1065 GiB of shuffled bytes), the overhead is "only" 104 GiB for 1024 GiB input, or roughly 10%. So there are additional reads, even when using {{transferTo}}. Maybe it has something to do with resource distribution? Note that I have disabled speculative execution, so there are no extra executions of additional reducers, which might read the same map output multiple times. However, there are 140 "Failed Shuffles" -- does that mean that they have been executed again? If so, and assuming that for 1024 GiB of input, each reducer needs to fetch {{1065 / 2048 = 0.52 GiB}}, there is an additional overhead of {{2048 * 0.52 = 73 GiB}}. What remains is an unexplained 31 GiB.

When running TeraSort with {{transferTo.allowed=false}} and my patch as described above, sorting 256 GiB, the MapReduce framework counters report 266 GiB of serialized map output (and thus, 266 GiB of shuffled bytes). In this run, there were no "Failed Shuffles". Since my analysis reports that YARN reads 300 GiB, the overhead is actually probably more correctly measured as 34 GiB (= 13% of 256 GiB) instead of 45 GiB (= 18% of 256 GiB). These 34 GiB are close enough to the 31 GiB for 1024 GiB input (see above), so maybe this is constant overhead for 2048 mappers and 2048 reducers?

Anyway, since I'll be investigating this behavior in the future, digging into per-file statistics, I'll be able to report exactly which file is read how often / how much of it is read. I can then tell exactly what is happening on disk. Since this is part of unpublished research, however, I'm afraid I can only report the results later.

> YARN Shuffle I/O for small partitions
> -------------------------------------
>
>                 Key: MAPREDUCE-6923
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>         Environment: Observed in Hadoop 2.7.3 and above (judging from the source code of future versions), and Ubuntu 16.04.
>            Reporter: Robert Schmidtke
>            Assignee: Robert Schmidtke
>         Attachments: MAPREDUCE-6923.00.patch
>
>
> When a job configuration results in small partitions read by each reducer from each mapper (e.g. 65 kilobytes as in my setup: a [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java] of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> <property>
>   <name>mapreduce.shuffle.transferTo.allowed</name>
>   <value>false</value>
> </property>
> {code}
> then the default setting of
> {code:xml}
> <property>
>   <name>mapreduce.shuffle.transfer.buffer.size</name>
>   <value>131072</value>
> </property>
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for each 65K needed, 128K are read.
> I propose a fix in [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114] as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer]. This sets the shuffle buffer size to the minimum value of the shuffle buffer size specified in the configuration (128K by default), and the actual partition size (65K on average in my setup). In my benchmarks this reduced the read overhead in YARN from about 100% (255 additional gigabytes as described above) down to about 18% (an additional 45 gigabytes). The runtime of the job remained the same in my setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org