You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2018/07/30 08:19:00 UTC
[jira] [Commented] (SPARK-24955) spark continuing to execute on a
task despite not reading all data from a downed machine
[ https://issues.apache.org/jira/browse/SPARK-24955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561617#comment-16561617 ]
Hyukjin Kwon commented on SPARK-24955:
--------------------------------------
(please avoid to set the target version which is usually reversed for committers)
> spark continuing to execute on a task despite not reading all data from a downed machine
> ----------------------------------------------------------------------------------------
>
> Key: SPARK-24955
> URL: https://issues.apache.org/jira/browse/SPARK-24955
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Shuffle
> Affects Versions: 2.3.0
> Reporter: San Tung
> Priority: Major
>
> We've recently run into a few instances where a downed node has led to incomplete data, causing correctness issues, which we can reproduce some of the time.
> *Setup:*
> - we're currently on spark 2.3.0
> - we allow retries on failed tasks and stages
> - we use PySpark to perform these operations
> *Stages:*
> Simplistically, the job does the following:
> - Stage 1/2: computes a number of `(sha256 hash, 0, 1)` partitioned into 65536 partitions
> - Stage 3/4: computes a number of `(sha256 hash, 1, 0)` partitioned into 6408 partitions (one hash may exist in multiple partitions)
> - Stage 5:
> - repartitions stage 2 and stage 4 by the first 2 bytes of each hash, and find which ones are not in common (stage 2 hashes - stage 4 hashes).
> - store this partition into a persistent data source.
> *Failure Scenario:*
> - We take out one of the machines (do a forced shutdown, for example)
> - For some tasks, stage 5 will die immediately with one of the following:
> ** `ExecutorLostFailure (executor 24 exited caused by one of the running tasks) Reason: worker lost`
> ** `FetchFailed(BlockManagerId(24, [redacted], 36829, None), shuffleId=2, mapId=14377, reduceId=48402, message=`
> - these tasks are reused to calculate stage 1-2 and 3-4 again that were missing on downed nodes, which is correctly recalculated by spark.
> - However, some tasks still continue executing from Stage 5, seemingly missing stage 4 data, dumping incorrect data to the stage 5 data source. We noticed the subtract operation taking ~1-2 minutes after the machine goes down, and stores a lot more data than usual (which on inspection is wrong).
> - we've seen this happen with slightly different execution plans too which don't involve or-ing, but end up being some variant of missing some stage 4 data.
> However, we cannot reproduce this consistently - sometimes all tasks fail gracefully. Correctly downed nodes means all these tasks fail and re-work on stage 1-2/3-4. Note that this solution produces the correct results if machines stay alive!
> We were wondering if a machine going down can result in a state where a task could keep executing even though not all data has been fetched which gives us incorrect results (or if there is setting that allows this - we tried scanning spark configs up and down). This seems similar to https://issues.apache.org/jira/browse/SPARK-24160 (maybe we get an empty packet?), but it doesn't look like that was to explicitly resolve any known bug.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org