You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Dongwook Kwon (JIRA)" <ji...@apache.org> on 2014/09/25 01:49:33 UTC

[jira] [Updated] (MAPREDUCE-6108) ShuffleError OOM while reserving memory by MergeManagerImpl

     [ https://issues.apache.org/jira/browse/MAPREDUCE-6108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongwook Kwon updated MAPREDUCE-6108:
-------------------------------------
    Summary: ShuffleError OOM while reserving memory by MergeManagerImpl  (was: ShuffleError OOM while reserving by MergeManagerImpl)

> ShuffleError OOM while reserving memory by MergeManagerImpl
> -----------------------------------------------------------
>
>                 Key: MAPREDUCE-6108
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6108
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.5.1
>            Reporter: Dongwook Kwon
>            Priority: Minor
>
> Shuffle has OOM issue from time to time.  
> Such as this email reported.
> http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-dev/201408.mbox/%3CCABWXXjNK-on0XTrMuriJD8SDGJjTAMSvQW2CZpm3oEkJ3YM8YQ@mail.gmail.com%3E
> {code}
> Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#14
>         at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
>         at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
>         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
>         at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
>         at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
>         at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297)
>         at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287)
>         at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
>         at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
>         at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)
> {code}
> Lowering mapreduce.reduce.shuffle.input.buffer.percent value mitigate the issue. However depending on the data and the memory system had, the issue comes back.
> From my test, when it's happening , the issue is very constant, memory foot print, and the point OOM happens was the same, regardless of the value of mapreduce.reduce.shuffle.input.buffer.percent( my test had default 0.7).  
> Here is what I found.
> According to MergeManagerImpl which implemented by https://issues.apache.org/jira/browse/MAPREDUCE-4808, it appears the reserve method deliberately allows just one thread(fetcher) to go over "memoryLimit" by checking the condition (usedMemory > memoryLimit) instead of (usedMemory + requestedSize > memoryLimit) to prevent stalling all fetchers issue as comment indicated. This seems working well most of times. However when the one fetcher tries to reserver usedMemory + requestedSize more than memoryLimit(Runtime.getRuntime().maxMemory()), I think there is OOM issue.
> {code}
>  @Override
> public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
> long requestedSize,
> int fetcher
> ) throws IOException {
> if (!canShuffleToMemory(requestedSize)) {
> LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
> " is greater than maxSingleShuffleLimit (" +
> maxSingleShuffleLimit + ")");
> return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
> jobConf, mapOutputFile, fetcher, true);
> }
> // Stall shuffle if we are above the memory limit
> // It is possible that all threads could just be stalling and not make
> // progress at all. This could happen when:
> //
> // requested size is causing the used memory to go above limit &&
> // requested size < singleShuffleLimit &&
> // current used size < mergeThreshold (merge will not get triggered)
> //
> // To avoid this from happening, we allow exactly one thread to go past
> // the memory limit. We check (usedMemory > memoryLimit) and not
> // (usedMemory + requestedSize > memoryLimit). When this thread is done
> // fetching, this will automatically trigger a merge thereby unlocking
> // all the stalled threads
> if (usedMemory > memoryLimit) {
> LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
> + ") is greater than memoryLimit (" + memoryLimit + ")." +
> " CommitMemory is (" + commitMemory + ")");
> return null;
> }
> // Allow the in-memory shuffle to progress
> LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
> + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
> + "CommitMemory is (" + commitMemory + ")");
> return unconditionalReserve(mapId, requestedSize, true);
> }
> {code}
> https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java#L256
> When the one fetcher tries to reserve (usedMemory + requestedSize > memoryLimit), depending on the memory the reducer has,  BoundedByteArrayOutputStream has the OOM issue at 
> {code}
>  public BoundedByteArrayOutputStream(int capacity, int limit) {
> this(new byte[capacity], 0, limit);
> }
> {code}
> https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java#L56
> memoryLimit is Runtime.getRuntime().maxMemory(), MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES seems for unit test.
> {code}
>     this.memoryLimit = 
>       (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
>           Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
>         * maxInMemCopyUse);
> {code}
> It explains why lowering mapreduce.reduce.shuffle.input.buffer.percent value resolves this issue and why the same setting sometimes works and doesn't.
> But I wasn't sure this is correct and what is the expected behavior for stalling fetchers issue to fix OOM issue as commented pointed out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)