You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Gopal Vijayaraghavan (Jira)" <ji...@apache.org> on 2020/05/26 16:19:00 UTC

[jira] [Commented] (TEZ-4189) Support fault tolerance in pipelined data transfer for unordered output

    [ https://issues.apache.org/jira/browse/TEZ-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116865#comment-17116865 ] 

Gopal Vijayaraghavan commented on TEZ-4189:
-------------------------------------------

bq.  because fault tolerance wasn't implemented for pipelined shuffle

A failure tolerance model was working in JDK7 and stopped working with JDK8, because we routinely use HashMap implementations in our operations to aggregate things in memory, then read them out into a shuffle output.

http://openjdk.java.net/jeps/180

prevents something like a group-by hash in memory from returning the rows in the same order.

So the 2nd attempt of the same task might not result in the same rows landing in spill 1, spill 2 etc.

Requiring us to restart the process, since we cannot restart a reducer from a fixed spil-id on a second attempt of a mapper & if we've already processed a spill for the reducer, we can't undo it.

> Support fault tolerance in pipelined data transfer for unordered output
> -----------------------------------------------------------------------
>
>                 Key: TEZ-4189
>                 URL: https://issues.apache.org/jira/browse/TEZ-4189
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: syslog_attempt_1589892856650_0073_1_07_000100_0, syslog_attempt_1589892856650_0073_1_07_000100_1, syslog_attempt_1589892856650_0073_1_08_000945_0
>
>
> Attached an example:
> Mapper task attempt 0 fails while spilling spill3:  [^syslog_attempt_1589892856650_0073_1_07_000100_0] 
> {code}
> 2020-05-25 13:08:10,702 [INFO] [UnorderedOutSpiller {Reducer_14} #0] |writers.UnorderedPartitionedKVWriter|: Writing spill 3 to /grid/2/yarn/nm/usercache/root/appcache/application_1589892856650_0073/output/attempt_1589892856650_0073_1_07_000100_0_10003_3/file.out
> 2020-05-25 13:08:10,736 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: Received should die response from AM
> {code}
> Mapper task attempt 1 passes, successfully spills out 7 spills:  [^syslog_attempt_1589892856650_0073_1_07_000100_1] 
> {code}
> 2020-05-25 13:09:47,722 [INFO] [TezChild] |writers.UnorderedPartitionedKVWriter|: Reducer_14: Adding spill event for spill (final update=true), spillId=7
> {code}
> Reducer tasks get killed because they cannot recover from the mapper task attempt failure, because fault tolerance wasn't implemented for pipelined shuffle I guess:  [^syslog_attempt_1589892856650_0073_1_08_000945_0] 
> {code}
> 2020-05-25 13:13:14,617 [ERROR] [ShuffleRunner {Map_16}] |impl.ShuffleManager|: Killing self as previous attempt data could have been consumed
> java.io.IOException: Previous event already got scheduled for InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, spillId=0]. Previous attempt's data could have been already merged to memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
>         at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.validateInputAttemptForPipelinedShuffle(ShuffleManager.java:503)
>         at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.constructFetcherForHost(ShuffleManager.java:551)
>         at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:455)
>         at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager$RunShuffleCallable.callInternal(ShuffleManager.java:398)
>         at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>         at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>         at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
>         at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> The interesting part is that the above failing reducer task attempt runs 4 minutes later than the successful, second mapper task attempt, so there is not case of unfortunate scheduling I think, rather the pipelined shuffle is not prepared for fetching in case of killed and reattempted inputs. For example, in the reducer task, it tries to fetch input from attempt_1589892856650_0073_1_07_000100_0
> 3_0, which is the map of the first, failing attempt, but this task start 4 minutes after there is already a successful attempt: attempt_1589892856650_0073_1_07_000100_1.
> {code}
> 2020-05-25 13:13:14,556 [INFO] [Fetcher_B {Map_16} #3] |HttpConnection.url|: for url=http://ctr-e141-1563959304486-69251-01-000009.hwx.site:13562/mapOutput?job=job_1589892856650_0073&dag=1&reduce=94
> 5&map=attempt_1589892856650_0073_1_07_000028_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_0,attempt_1589892856650_0073_1_07_000052_0_10002_0,attempt_1589892856650_0073_1_07_000100_0_1000
> 3_0,attempt_1589892856650_0073_1_07_000108_0_10002_0,attempt_1589892856650_0073_1_07_000036_0_10003_0,attempt_1589892856650_0073_1_07_000092_0_10002_0,attempt_1589892856650_0073_1_07_000076_0_10003_
> 0,attempt_1589892856650_0073_1_07_000012_0_10002_0,attempt_1589892856650_0073_1_07_000044_0_10003_0,attempt_1589892856650_0073_1_07_000004_0_10003_0,attempt_1589892856650_0073_1_07_000068_0_10003_0,
> attempt_1589892856650_0073_1_07_000060_0_10002_0,attempt_1589892856650_0073_1_07_000020_0_10003_0,attempt_1589892856650_0073_1_07_000084_0_10003_1,attempt_1589892856650_0073_1_07_000108_0_10002_1,at
> tempt_1589892856650_0073_1_07_000028_0_10003_1,attempt_1589892856650_0073_1_07_000100_0_10003_1,attempt_1589892856650_0073_1_07_000076_0_10003_1,attempt_1589892856650_0073_1_07_000052_0_10002_1,atte
> mpt_1589892856650_0073_1_07_000066_1_10024_0,attempt_1589892856650_0073_1_07_000066_1_10024_1&keepAlive=true sent hash and receievd reply 0 ms
> {code}
> Fetches an input spill from the first failing task attempt successfully:
> {code}
> 2020-05-25 13:13:14,562 [INFO] [Fetcher_B {Map_16} #3] |ShuffleManager.fetch|: Completed fetch for attempt: {100, 0, attempt_1589892856650_0073_1_07_000100_0_10003_0, 1, 0} to MEMORY, csize=18631, d
> {code}
> And then the failure mentioned above because of the [fail-fast check|https://github.com/apache/tez/blob/master/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java#L495] of different attempt numbers:
> {code}
> java.io.IOException: Previous event already got scheduled for InputAttemptIdentifier [inputIdentifier=100, attemptNumber=1, pathComponent=attempt_1589892856650_0073_1_07_000100_1_10017_0, spillType=1, spillId=0]. Previous attempt's data could have been already merged to memory/disk outputs.  Killing (self) this task early. currentAttemptNum=0, eventsProcessed={0, 1}, scheduledForDownload=true, newAttemptNum=1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)