You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Prem Sahoo <pr...@gmail.com> on 2024/02/29 15:04:41 UTC

When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

When Spark job shows FetchFailedException it creates few duplicate data and
we see few data also missing , please explain why. We have scenario when
spark job complains FetchFailedException as one of the data node got
rebooted middle of job running .

Now due to this we have few duplicate data and few missing data . Why spark
is not handling this scenario correctly ? kind of we shouldn't miss any
data and we shouldn't create duplicate data .



I am using spark3.2.0 version.

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Jason,

I read your notes and the code simulating the problem as link
https://issues.apache.org/jira/browse/SPARK-38388  and the specific
repartition issue (SPARK-38388) that this code aims to demonstrate

The code below from the above link Jira

import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100,
$"id").withColumn("val", rand()).repartition(100).map {
  row => if (TaskContext.get.stageAttemptNumber == 0 &&
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
    throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show

*contains a potential security risk* by using scala.sys.process to execute
the pkill -f java command. While the code aims to demonstrate the
repartition issue, using pkill is IMO unnecessary and risky. This could
potentially terminate critical processes on the cluster as well. Instead of
throwing an exception based on partition ID, you can try to filter out
unwanted partitions before applying the map transformation like below

val filteredDS = ds.filter($"id".lt(98)) // Filter out partitions with ID
>= 98
filteredDS.map { row => TestObject(row.getLong(0), row.getDouble(1)) }

By using filteredDS for subsequent transformations or actions, you avoid
redundant processing and potential complications from the conditional logic
in the original map transformation. This approach is a safer simulation of
the repartition issue by only working with the filtered dataset, representing
the partitions that would have hypothetically succeeded.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".

On Mon, 4 Mar 2024 at 18:26, Jason Xu <ja...@gmail.com> wrote:

> Hi Prem,
>
> From the symptom of shuffle fetch failure and few duplicate data and few
> missing data, I think you might run into this correctness bug:
> https://issues.apache.org/jira/browse/SPARK-38388.
>
> Node/shuffle failure is hard to avoid, I wonder if you have
> non-deterministic logic and calling repartition() (round robin
> partitioning) in your code? If you can avoid either of these, you can avoid
> the issue from happening for now. To root fix the issue, it requires a
> non-trivial effort, I don't think there's a solution available yet.
>
> I have heard that there are community efforts to solve this issue, but I
> lack detailed information. Hopefully, someone with more knowledge can
> provide further insight.
>
> Best,
> Jason
>
> On Mon, Mar 4, 2024 at 9:41 AM Prem Sahoo <pr...@gmail.com> wrote:
>
>> super :(
>>
>> On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> "... in a nutshell  if fetchFailedException occurs due to data node
>>> reboot then it  can create duplicate / missing data  .   so this is more of
>>> hardware(env issue ) rather than spark issue ."
>>>
>>> As an overall conclusion your point is correct but again the answer is
>>> not binary.
>>>
>>> Spark core relies on a distributed file system to store data across data
>>> nodes. When Spark needs to process data, it fetches the required blocks
>>> from the data nodes.* FetchFailedException*: means  that Spark
>>> encountered an error while fetching data blocks from a data node. If a data
>>> node reboots unexpectedly, it becomes unavailable to Spark for a
>>> period. During this time, Spark might attempt to fetch data blocks from the
>>> unavailable node, resulting in the FetchFailedException.. Depending on the
>>> timing and nature of the reboot and data access, this exception can lead
>>> to:the following:
>>>
>>>    - Duplicate Data: If Spark retries the fetch operation successfully
>>>    after the reboot, it might end up processing the same data twice, leading
>>>    to duplicates.
>>>    - Missing Data: If Spark cannot fetch all required data blocks due
>>>    to the unavailable data node, some data might be missing from the
>>>    processing results.
>>>
>>> The root cause of this issue lies in the data node reboot itself. So we
>>> can conclude that it is not a  problem with Spark core functionality but
>>> rather an environmental issue within the distributed storage systemL  You
>>> need to ensure that your nodes are stable and minimise unexpected reboots
>>> for whatever reason. Look at the host logs  or run /usr/bin/dmesg to see
>>> what happened..
>>>
>>> Good luck
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <pr...@gmail.com> wrote:
>>>
>>>> thanks Mich, in a nutshell  if fetchFailedException occurs due to data
>>>> node reboot then it  can create duplicate / missing data  .   so this is
>>>> more of hardware(env issue ) rather than spark issue .
>>>>
>>>>
>>>>
>>>> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It seems to me that there are issues related to below
>>>>>
>>>>> *<Prem> I think when a task failed in between  and retry task started
>>>>> and completed it may create duplicate as failed task has some data + retry
>>>>> task has  full data.  but my question is why spark keeps delta data or
>>>>> according to you if speculative and original task completes generally spark
>>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>>> why it has missing data.*
>>>>>
>>>>> Spark is designed to be fault-tolerant through lineage and
>>>>> recomputation. However, there are scenarios where speculative execution or
>>>>> task retries might lead to duplicated or missing data. So what are these?
>>>>>
>>>>> - Task Failure and Retry: You are correct that a failed task might
>>>>> have processed some data before encountering the FetchFailedException. If a
>>>>> retry succeeds, it would process the entire data partition again, leading
>>>>> to duplicates. When a task fails, Spark may recompute the lost data by
>>>>> recomputing the lost task on another node.  The output of the retried task
>>>>> is typically combined with the output of the original task during the final
>>>>> stage of the computation. This combination is done to handle scenarios
>>>>> where the original task partially completed and generated some output
>>>>> before failing. Spark does not intentionally store partially processed
>>>>> data. However, due to retries and speculative execution, duplicate
>>>>> processing can occur. To the best of my knowledge, Spark itself doesn't
>>>>> have a mechanism to identify and eliminate duplicates automatically. While
>>>>> Spark might sometimes kill speculative tasks if the original one finishes,
>>>>> it is not a guaranteed behavior. This depends on various factors like
>>>>> scheduling and task dependencies.
>>>>>
>>>>> - Speculative Execution: Spark supports speculative execution, where
>>>>> the same task is launched on multiple executors simultaneously. The result
>>>>> of the first completed task is used, and the others are usually killed to
>>>>> avoid duplicated results. However, speculative execution might introduce
>>>>> some duplication in the final output if tasks on different executors
>>>>> complete successfully.
>>>>>
>>>>> - Node Reboots and Fault Tolerance: If the data node reboot leads to
>>>>> data corruption or loss, that data might be unavailable to Spark. Even with
>>>>> fault tolerance, Spark cannot recover completely missing data. Fault
>>>>> tolerance focuses on recovering from issues like executor failures, not
>>>>> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
>>>>> handle executor failures by rescheduling tasks on other available executors
>>>>> and temporary network issues by retrying fetches based on configuration.
>>>>>
>>>>> Here are some stuff to consider:
>>>>>
>>>>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower
>>>>> value such as  1 or 2 to reduce the chance of duplicate processing
>>>>> attempts, if retries are suspected to be a source.
>>>>> - Disable speculative execution if needed: Consider disabling
>>>>> speculative execution (spark.speculation=false) if duplicates are a major
>>>>> concern. However, this might impact performance.
>>>>> - Data persistence: As mentioned in the previous reply, persist
>>>>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
>>>>> is critical. This ensures data availability even during node failures.
>>>>> - Data validation checks: Implement data validation checks after
>>>>> processing to identify potential duplicates or missing data.
>>>>> HTH
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <pr...@gmail.com> wrote:
>>>>>
>>>>>> Hello Mich,
>>>>>> thanks for your reply.
>>>>>>
>>>>>> As an engineer I can chip in. You may have partial execution and
>>>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>>>> node a few times before marking it permanently unavailable. However, if the
>>>>>> rebooted node recovers quickly within this retry window, some executors
>>>>>> might successfully fetch the data after a retry. *This leads to
>>>>>> duplicate processing of the same data partition*.
>>>>>>
>>>>>> <Prem> data node reboot is taking more than 20 mins and our config
>>>>>> spark.network.timeout=300s so we don't have dupls for the above reason.
>>>>>> I am not sure this one applies to your spark version but spark may
>>>>>> speculatively execute tasks on different executors to improve
>>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>>> speculative task might be launched on another executor. This is where fun
>>>>>> and games start. If the unavailable node recovers before the speculative
>>>>>> task finishes, both the original and speculative tasks might complete
>>>>>> successfully,* resulting in duplicates*. With regard to missing
>>>>>> data, if the data node reboot leads to data corruption or loss, some data
>>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>>> processing that missing data, leading to missing data in the final output.
>>>>>>
>>>>>> <Prem> I think when a task failed in between  and retry task started
>>>>>> and completed it may create duplicate as failed task has some data + retry
>>>>>> task has  full data.  but my question is why spark keeps delta data or
>>>>>> according to you if speculative and original task completes generally spark
>>>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>>>> why it has missing data.
>>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>>> some cases. You can consider persisting intermediate data results to a
>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>>> processing overhead but can ensure data integrity.
>>>>>>
>>>>>> <Prem> How spark will handle these without a checkpoint as it will
>>>>>> slow down the process .  I have data loss or duplication is due to
>>>>>> fetchFailedException as a part of data node reboot.
>>>>>> I have few config to minimize fetchFailedException
>>>>>> spark.network.timeout=300s
>>>>>> spark.reducer.maxReqsInFlight=4
>>>>>> spark.shuffle.io.retryWait=30s
>>>>>> spark.shuffle.io.maxRetries=3
>>>>>>
>>>>>> When we get a fetchFailedException due to data node reboot then spark
>>>>>> should handle it gracefully isn't it ?
>>>>>> or how to handle it ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Your point -> "When Spark job shows FetchFailedException it creates
>>>>>>> few duplicate data and  we see few data also missing , please explain why.
>>>>>>> We have scenario when  spark job complains *FetchFailedException as
>>>>>>> one of the data node got ** rebooted middle of job running ."*
>>>>>>>
>>>>>>> As an engineer I can chip in. You may have partial execution and
>>>>>>> retries  meaning when spark encounters a *FetchFailedException*,
>>>>>>> it  may retry fetching the data from the unavailable (the one being
>>>>>>> rebooted) node a few times before marking it permanently unavailable.
>>>>>>> However, if the rebooted node recovers quickly within this retry window,
>>>>>>> some executors might successfully fetch the data after a retry. *This
>>>>>>> leads to duplicate processing of the same data partition*.
>>>>>>>
>>>>>>> I am not sure this one applies to your spark version but spark may
>>>>>>> speculatively execute tasks on different executors to improve
>>>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>>>> speculative task might be launched on another executor. This is where fun
>>>>>>> and games start. If the unavailable node recovers before the speculative
>>>>>>> task finishes, both the original and speculative tasks might complete
>>>>>>> successfully,* resulting in duplicates*. With regard to missing
>>>>>>> data, if the data node reboot leads to data corruption or loss, some data
>>>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>>>> processing that missing data, leading to missing data in the final output.
>>>>>>>
>>>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>>>> some cases. You can consider persisting intermediate data results to a
>>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>>>> processing overhead but can ensure data integrity.
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Dad | Technologist
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>>>> expert opinions (Werner
>>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello All,
>>>>>>>> in the list of JIRAs i didn't find anything related to
>>>>>>>> fetchFailedException.
>>>>>>>>
>>>>>>>> as mentioned above
>>>>>>>>
>>>>>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>>>>>> data and we see few data also missing , please explain why. We have a
>>>>>>>> scenario when spark job complains FetchFailedException as one of the data
>>>>>>>> nodes got rebooted in the middle of job running .
>>>>>>>> Now due to this we have few duplicate data and few missing data .
>>>>>>>> Why is spark not handling this scenario correctly ? kind of we shouldn't
>>>>>>>> miss any data and we shouldn't create duplicate data . "
>>>>>>>>
>>>>>>>> We have to rerun the job again to fix this data quality issue .
>>>>>>>> Please let me know why this case is not handled properly by Spark ?
>>>>>>>>
>>>>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <
>>>>>>>> dongjoon.hyun@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Please use the url as thr full string including '()' part.
>>>>>>>>>
>>>>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and
>>>>>>>>> three labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>>>>>
>>>>>>>>> Dongjoon
>>>>>>>>>
>>>>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Dongjoon,
>>>>>>>>>> Thanks for emailing me.
>>>>>>>>>> Could you please share a list of fixes  as the link provided by
>>>>>>>>>> you is not working.
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <
>>>>>>>>>> dongjoon@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> If you are observing correctness issues, you may hit some old
>>>>>>>>>>> (and fixed) correctness issues.
>>>>>>>>>>>
>>>>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>>>>>> correctness issues.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>>>>>
>>>>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>>>>>
>>>>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because
>>>>>>>>>>> Apache Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>>>>>
>>>>>>>>>>> It would be help if you can report any correctness issues with
>>>>>>>>>>> Apache Spark 3.5.1.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dongjoon.
>>>>>>>>>>>
>>>>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>>>>>> > When Spark job shows FetchFailedException it creates few
>>>>>>>>>>> duplicate data and
>>>>>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>>>>>> scenario when
>>>>>>>>>>> > spark job complains FetchFailedException as one of the data
>>>>>>>>>>> node got
>>>>>>>>>>> > rebooted middle of job running .
>>>>>>>>>>> >
>>>>>>>>>>> > Now due to this we have few duplicate data and few missing
>>>>>>>>>>> data . Why spark
>>>>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>>>>>> miss any
>>>>>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > I am using spark3.2.0 version.
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Prem Sahoo <pr...@gmail.com>.
Thanks Jason for detailed information and big associated with it.
Hopefully someone provided more information about this pressing issue.

On Mon, Mar 4, 2024 at 1:26 PM Jason Xu <ja...@gmail.com> wrote:

> Hi Prem,
>
> From the symptom of shuffle fetch failure and few duplicate data and few
> missing data, I think you might run into this correctness bug:
> https://issues.apache.org/jira/browse/SPARK-38388.
>
> Node/shuffle failure is hard to avoid, I wonder if you have
> non-deterministic logic and calling repartition() (round robin
> partitioning) in your code? If you can avoid either of these, you can avoid
> the issue from happening for now. To root fix the issue, it requires a
> non-trivial effort, I don't think there's a solution available yet.
>
> I have heard that there are community efforts to solve this issue, but I
> lack detailed information. Hopefully, someone with more knowledge can
> provide further insight.
>
> Best,
> Jason
>
> On Mon, Mar 4, 2024 at 9:41 AM Prem Sahoo <pr...@gmail.com> wrote:
>
>> super :(
>>
>> On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> "... in a nutshell  if fetchFailedException occurs due to data node
>>> reboot then it  can create duplicate / missing data  .   so this is more of
>>> hardware(env issue ) rather than spark issue ."
>>>
>>> As an overall conclusion your point is correct but again the answer is
>>> not binary.
>>>
>>> Spark core relies on a distributed file system to store data across data
>>> nodes. When Spark needs to process data, it fetches the required blocks
>>> from the data nodes.* FetchFailedException*: means  that Spark
>>> encountered an error while fetching data blocks from a data node. If a data
>>> node reboots unexpectedly, it becomes unavailable to Spark for a
>>> period. During this time, Spark might attempt to fetch data blocks from the
>>> unavailable node, resulting in the FetchFailedException.. Depending on the
>>> timing and nature of the reboot and data access, this exception can lead
>>> to:the following:
>>>
>>>    - Duplicate Data: If Spark retries the fetch operation successfully
>>>    after the reboot, it might end up processing the same data twice, leading
>>>    to duplicates.
>>>    - Missing Data: If Spark cannot fetch all required data blocks due
>>>    to the unavailable data node, some data might be missing from the
>>>    processing results.
>>>
>>> The root cause of this issue lies in the data node reboot itself. So we
>>> can conclude that it is not a  problem with Spark core functionality but
>>> rather an environmental issue within the distributed storage systemL  You
>>> need to ensure that your nodes are stable and minimise unexpected reboots
>>> for whatever reason. Look at the host logs  or run /usr/bin/dmesg to see
>>> what happened..
>>>
>>> Good luck
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <pr...@gmail.com> wrote:
>>>
>>>> thanks Mich, in a nutshell  if fetchFailedException occurs due to data
>>>> node reboot then it  can create duplicate / missing data  .   so this is
>>>> more of hardware(env issue ) rather than spark issue .
>>>>
>>>>
>>>>
>>>> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It seems to me that there are issues related to below
>>>>>
>>>>> *<Prem> I think when a task failed in between  and retry task started
>>>>> and completed it may create duplicate as failed task has some data + retry
>>>>> task has  full data.  but my question is why spark keeps delta data or
>>>>> according to you if speculative and original task completes generally spark
>>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>>> why it has missing data.*
>>>>>
>>>>> Spark is designed to be fault-tolerant through lineage and
>>>>> recomputation. However, there are scenarios where speculative execution or
>>>>> task retries might lead to duplicated or missing data. So what are these?
>>>>>
>>>>> - Task Failure and Retry: You are correct that a failed task might
>>>>> have processed some data before encountering the FetchFailedException. If a
>>>>> retry succeeds, it would process the entire data partition again, leading
>>>>> to duplicates. When a task fails, Spark may recompute the lost data by
>>>>> recomputing the lost task on another node.  The output of the retried task
>>>>> is typically combined with the output of the original task during the final
>>>>> stage of the computation. This combination is done to handle scenarios
>>>>> where the original task partially completed and generated some output
>>>>> before failing. Spark does not intentionally store partially processed
>>>>> data. However, due to retries and speculative execution, duplicate
>>>>> processing can occur. To the best of my knowledge, Spark itself doesn't
>>>>> have a mechanism to identify and eliminate duplicates automatically. While
>>>>> Spark might sometimes kill speculative tasks if the original one finishes,
>>>>> it is not a guaranteed behavior. This depends on various factors like
>>>>> scheduling and task dependencies.
>>>>>
>>>>> - Speculative Execution: Spark supports speculative execution, where
>>>>> the same task is launched on multiple executors simultaneously. The result
>>>>> of the first completed task is used, and the others are usually killed to
>>>>> avoid duplicated results. However, speculative execution might introduce
>>>>> some duplication in the final output if tasks on different executors
>>>>> complete successfully.
>>>>>
>>>>> - Node Reboots and Fault Tolerance: If the data node reboot leads to
>>>>> data corruption or loss, that data might be unavailable to Spark. Even with
>>>>> fault tolerance, Spark cannot recover completely missing data. Fault
>>>>> tolerance focuses on recovering from issues like executor failures, not
>>>>> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
>>>>> handle executor failures by rescheduling tasks on other available executors
>>>>> and temporary network issues by retrying fetches based on configuration.
>>>>>
>>>>> Here are some stuff to consider:
>>>>>
>>>>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower
>>>>> value such as  1 or 2 to reduce the chance of duplicate processing
>>>>> attempts, if retries are suspected to be a source.
>>>>> - Disable speculative execution if needed: Consider disabling
>>>>> speculative execution (spark.speculation=false) if duplicates are a major
>>>>> concern. However, this might impact performance.
>>>>> - Data persistence: As mentioned in the previous reply, persist
>>>>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
>>>>> is critical. This ensures data availability even during node failures.
>>>>> - Data validation checks: Implement data validation checks after
>>>>> processing to identify potential duplicates or missing data.
>>>>> HTH
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <pr...@gmail.com> wrote:
>>>>>
>>>>>> Hello Mich,
>>>>>> thanks for your reply.
>>>>>>
>>>>>> As an engineer I can chip in. You may have partial execution and
>>>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>>>> node a few times before marking it permanently unavailable. However, if the
>>>>>> rebooted node recovers quickly within this retry window, some executors
>>>>>> might successfully fetch the data after a retry. *This leads to
>>>>>> duplicate processing of the same data partition*.
>>>>>>
>>>>>> <Prem> data node reboot is taking more than 20 mins and our config
>>>>>> spark.network.timeout=300s so we don't have dupls for the above reason.
>>>>>> I am not sure this one applies to your spark version but spark may
>>>>>> speculatively execute tasks on different executors to improve
>>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>>> speculative task might be launched on another executor. This is where fun
>>>>>> and games start. If the unavailable node recovers before the speculative
>>>>>> task finishes, both the original and speculative tasks might complete
>>>>>> successfully,* resulting in duplicates*. With regard to missing
>>>>>> data, if the data node reboot leads to data corruption or loss, some data
>>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>>> processing that missing data, leading to missing data in the final output.
>>>>>>
>>>>>> <Prem> I think when a task failed in between  and retry task started
>>>>>> and completed it may create duplicate as failed task has some data + retry
>>>>>> task has  full data.  but my question is why spark keeps delta data or
>>>>>> according to you if speculative and original task completes generally spark
>>>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>>>> why it has missing data.
>>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>>> some cases. You can consider persisting intermediate data results to a
>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>>> processing overhead but can ensure data integrity.
>>>>>>
>>>>>> <Prem> How spark will handle these without a checkpoint as it will
>>>>>> slow down the process .  I have data loss or duplication is due to
>>>>>> fetchFailedException as a part of data node reboot.
>>>>>> I have few config to minimize fetchFailedException
>>>>>> spark.network.timeout=300s
>>>>>> spark.reducer.maxReqsInFlight=4
>>>>>> spark.shuffle.io.retryWait=30s
>>>>>> spark.shuffle.io.maxRetries=3
>>>>>>
>>>>>> When we get a fetchFailedException due to data node reboot then spark
>>>>>> should handle it gracefully isn't it ?
>>>>>> or how to handle it ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Your point -> "When Spark job shows FetchFailedException it creates
>>>>>>> few duplicate data and  we see few data also missing , please explain why.
>>>>>>> We have scenario when  spark job complains *FetchFailedException as
>>>>>>> one of the data node got ** rebooted middle of job running ."*
>>>>>>>
>>>>>>> As an engineer I can chip in. You may have partial execution and
>>>>>>> retries  meaning when spark encounters a *FetchFailedException*,
>>>>>>> it  may retry fetching the data from the unavailable (the one being
>>>>>>> rebooted) node a few times before marking it permanently unavailable.
>>>>>>> However, if the rebooted node recovers quickly within this retry window,
>>>>>>> some executors might successfully fetch the data after a retry. *This
>>>>>>> leads to duplicate processing of the same data partition*.
>>>>>>>
>>>>>>> I am not sure this one applies to your spark version but spark may
>>>>>>> speculatively execute tasks on different executors to improve
>>>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>>>> speculative task might be launched on another executor. This is where fun
>>>>>>> and games start. If the unavailable node recovers before the speculative
>>>>>>> task finishes, both the original and speculative tasks might complete
>>>>>>> successfully,* resulting in duplicates*. With regard to missing
>>>>>>> data, if the data node reboot leads to data corruption or loss, some data
>>>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>>>> processing that missing data, leading to missing data in the final output.
>>>>>>>
>>>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>>>> some cases. You can consider persisting intermediate data results to a
>>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>>>> processing overhead but can ensure data integrity.
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Dad | Technologist
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>>>> expert opinions (Werner
>>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello All,
>>>>>>>> in the list of JIRAs i didn't find anything related to
>>>>>>>> fetchFailedException.
>>>>>>>>
>>>>>>>> as mentioned above
>>>>>>>>
>>>>>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>>>>>> data and we see few data also missing , please explain why. We have a
>>>>>>>> scenario when spark job complains FetchFailedException as one of the data
>>>>>>>> nodes got rebooted in the middle of job running .
>>>>>>>> Now due to this we have few duplicate data and few missing data .
>>>>>>>> Why is spark not handling this scenario correctly ? kind of we shouldn't
>>>>>>>> miss any data and we shouldn't create duplicate data . "
>>>>>>>>
>>>>>>>> We have to rerun the job again to fix this data quality issue .
>>>>>>>> Please let me know why this case is not handled properly by Spark ?
>>>>>>>>
>>>>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <
>>>>>>>> dongjoon.hyun@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Please use the url as thr full string including '()' part.
>>>>>>>>>
>>>>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and
>>>>>>>>> three labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>>>>>
>>>>>>>>> Dongjoon
>>>>>>>>>
>>>>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Dongjoon,
>>>>>>>>>> Thanks for emailing me.
>>>>>>>>>> Could you please share a list of fixes  as the link provided by
>>>>>>>>>> you is not working.
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <
>>>>>>>>>> dongjoon@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> If you are observing correctness issues, you may hit some old
>>>>>>>>>>> (and fixed) correctness issues.
>>>>>>>>>>>
>>>>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>>>>>> correctness issues.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>>>>>
>>>>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>>>>>
>>>>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because
>>>>>>>>>>> Apache Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>>>>>
>>>>>>>>>>> It would be help if you can report any correctness issues with
>>>>>>>>>>> Apache Spark 3.5.1.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dongjoon.
>>>>>>>>>>>
>>>>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>>>>>> > When Spark job shows FetchFailedException it creates few
>>>>>>>>>>> duplicate data and
>>>>>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>>>>>> scenario when
>>>>>>>>>>> > spark job complains FetchFailedException as one of the data
>>>>>>>>>>> node got
>>>>>>>>>>> > rebooted middle of job running .
>>>>>>>>>>> >
>>>>>>>>>>> > Now due to this we have few duplicate data and few missing
>>>>>>>>>>> data . Why spark
>>>>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>>>>>> miss any
>>>>>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > I am using spark3.2.0 version.
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Jason Xu <ja...@gmail.com>.
Hi Prem,

From the symptom of shuffle fetch failure and few duplicate data and few
missing data, I think you might run into this correctness bug:
https://issues.apache.org/jira/browse/SPARK-38388.

Node/shuffle failure is hard to avoid, I wonder if you have
non-deterministic logic and calling repartition() (round robin
partitioning) in your code? If you can avoid either of these, you can avoid
the issue from happening for now. To root fix the issue, it requires a
non-trivial effort, I don't think there's a solution available yet.

I have heard that there are community efforts to solve this issue, but I
lack detailed information. Hopefully, someone with more knowledge can
provide further insight.

Best,
Jason

On Mon, Mar 4, 2024 at 9:41 AM Prem Sahoo <pr...@gmail.com> wrote:

> super :(
>
> On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> "... in a nutshell  if fetchFailedException occurs due to data node
>> reboot then it  can create duplicate / missing data  .   so this is more of
>> hardware(env issue ) rather than spark issue ."
>>
>> As an overall conclusion your point is correct but again the answer is
>> not binary.
>>
>> Spark core relies on a distributed file system to store data across data
>> nodes. When Spark needs to process data, it fetches the required blocks
>> from the data nodes.* FetchFailedException*: means  that Spark
>> encountered an error while fetching data blocks from a data node. If a data
>> node reboots unexpectedly, it becomes unavailable to Spark for a
>> period. During this time, Spark might attempt to fetch data blocks from the
>> unavailable node, resulting in the FetchFailedException.. Depending on the
>> timing and nature of the reboot and data access, this exception can lead
>> to:the following:
>>
>>    - Duplicate Data: If Spark retries the fetch operation successfully
>>    after the reboot, it might end up processing the same data twice, leading
>>    to duplicates.
>>    - Missing Data: If Spark cannot fetch all required data blocks due to
>>    the unavailable data node, some data might be missing from the processing
>>    results.
>>
>> The root cause of this issue lies in the data node reboot itself. So we
>> can conclude that it is not a  problem with Spark core functionality but
>> rather an environmental issue within the distributed storage systemL  You
>> need to ensure that your nodes are stable and minimise unexpected reboots
>> for whatever reason. Look at the host logs  or run /usr/bin/dmesg to see
>> what happened..
>>
>> Good luck
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <pr...@gmail.com> wrote:
>>
>>> thanks Mich, in a nutshell  if fetchFailedException occurs due to data
>>> node reboot then it  can create duplicate / missing data  .   so this is
>>> more of hardware(env issue ) rather than spark issue .
>>>
>>>
>>>
>>> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> It seems to me that there are issues related to below
>>>>
>>>> *<Prem> I think when a task failed in between  and retry task started
>>>> and completed it may create duplicate as failed task has some data + retry
>>>> task has  full data.  but my question is why spark keeps delta data or
>>>> according to you if speculative and original task completes generally spark
>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>> why it has missing data.*
>>>>
>>>> Spark is designed to be fault-tolerant through lineage and
>>>> recomputation. However, there are scenarios where speculative execution or
>>>> task retries might lead to duplicated or missing data. So what are these?
>>>>
>>>> - Task Failure and Retry: You are correct that a failed task might have
>>>> processed some data before encountering the FetchFailedException. If a
>>>> retry succeeds, it would process the entire data partition again, leading
>>>> to duplicates. When a task fails, Spark may recompute the lost data by
>>>> recomputing the lost task on another node.  The output of the retried task
>>>> is typically combined with the output of the original task during the final
>>>> stage of the computation. This combination is done to handle scenarios
>>>> where the original task partially completed and generated some output
>>>> before failing. Spark does not intentionally store partially processed
>>>> data. However, due to retries and speculative execution, duplicate
>>>> processing can occur. To the best of my knowledge, Spark itself doesn't
>>>> have a mechanism to identify and eliminate duplicates automatically. While
>>>> Spark might sometimes kill speculative tasks if the original one finishes,
>>>> it is not a guaranteed behavior. This depends on various factors like
>>>> scheduling and task dependencies.
>>>>
>>>> - Speculative Execution: Spark supports speculative execution, where
>>>> the same task is launched on multiple executors simultaneously. The result
>>>> of the first completed task is used, and the others are usually killed to
>>>> avoid duplicated results. However, speculative execution might introduce
>>>> some duplication in the final output if tasks on different executors
>>>> complete successfully.
>>>>
>>>> - Node Reboots and Fault Tolerance: If the data node reboot leads to
>>>> data corruption or loss, that data might be unavailable to Spark. Even with
>>>> fault tolerance, Spark cannot recover completely missing data. Fault
>>>> tolerance focuses on recovering from issues like executor failures, not
>>>> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
>>>> handle executor failures by rescheduling tasks on other available executors
>>>> and temporary network issues by retrying fetches based on configuration.
>>>>
>>>> Here are some stuff to consider:
>>>>
>>>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
>>>> such as  1 or 2 to reduce the chance of duplicate processing attempts, if
>>>> retries are suspected to be a source.
>>>> - Disable speculative execution if needed: Consider disabling
>>>> speculative execution (spark.speculation=false) if duplicates are a major
>>>> concern. However, this might impact performance.
>>>> - Data persistence: As mentioned in the previous reply, persist
>>>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
>>>> is critical. This ensures data availability even during node failures.
>>>> - Data validation checks: Implement data validation checks after
>>>> processing to identify potential duplicates or missing data.
>>>> HTH
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <pr...@gmail.com> wrote:
>>>>
>>>>> Hello Mich,
>>>>> thanks for your reply.
>>>>>
>>>>> As an engineer I can chip in. You may have partial execution and
>>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>>> node a few times before marking it permanently unavailable. However, if the
>>>>> rebooted node recovers quickly within this retry window, some executors
>>>>> might successfully fetch the data after a retry. *This leads to
>>>>> duplicate processing of the same data partition*.
>>>>>
>>>>> <Prem> data node reboot is taking more than 20 mins and our config
>>>>> spark.network.timeout=300s so we don't have dupls for the above reason.
>>>>> I am not sure this one applies to your spark version but spark may
>>>>> speculatively execute tasks on different executors to improve
>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>> speculative task might be launched on another executor. This is where fun
>>>>> and games start. If the unavailable node recovers before the speculative
>>>>> task finishes, both the original and speculative tasks might complete
>>>>> successfully,* resulting in duplicates*. With regard to missing data,
>>>>> if the data node reboot leads to data corruption or loss, some data
>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>> processing that missing data, leading to missing data in the final output.
>>>>>
>>>>> <Prem> I think when a task failed in between  and retry task started
>>>>> and completed it may create duplicate as failed task has some data + retry
>>>>> task has  full data.  but my question is why spark keeps delta data or
>>>>> according to you if speculative and original task completes generally spark
>>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>>> why it has missing data.
>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>> some cases. You can consider persisting intermediate data results to a
>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>> processing overhead but can ensure data integrity.
>>>>>
>>>>> <Prem> How spark will handle these without a checkpoint as it will
>>>>> slow down the process .  I have data loss or duplication is due to
>>>>> fetchFailedException as a part of data node reboot.
>>>>> I have few config to minimize fetchFailedException
>>>>> spark.network.timeout=300s
>>>>> spark.reducer.maxReqsInFlight=4
>>>>> spark.shuffle.io.retryWait=30s
>>>>> spark.shuffle.io.maxRetries=3
>>>>>
>>>>> When we get a fetchFailedException due to data node reboot then spark
>>>>> should handle it gracefully isn't it ?
>>>>> or how to handle it ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Your point -> "When Spark job shows FetchFailedException it creates
>>>>>> few duplicate data and  we see few data also missing , please explain why.
>>>>>> We have scenario when  spark job complains *FetchFailedException as
>>>>>> one of the data node got ** rebooted middle of job running ."*
>>>>>>
>>>>>> As an engineer I can chip in. You may have partial execution and
>>>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>>>> node a few times before marking it permanently unavailable. However, if the
>>>>>> rebooted node recovers quickly within this retry window, some executors
>>>>>> might successfully fetch the data after a retry. *This leads to
>>>>>> duplicate processing of the same data partition*.
>>>>>>
>>>>>> I am not sure this one applies to your spark version but spark may
>>>>>> speculatively execute tasks on different executors to improve
>>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>>> speculative task might be launched on another executor. This is where fun
>>>>>> and games start. If the unavailable node recovers before the speculative
>>>>>> task finishes, both the original and speculative tasks might complete
>>>>>> successfully,* resulting in duplicates*. With regard to missing
>>>>>> data, if the data node reboot leads to data corruption or loss, some data
>>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>>> processing that missing data, leading to missing data in the final output.
>>>>>>
>>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>>> some cases. You can consider persisting intermediate data results to a
>>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>>> processing overhead but can ensure data integrity.
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Dad | Technologist
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>>> expert opinions (Werner
>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>>
>>>>>>
>>>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello All,
>>>>>>> in the list of JIRAs i didn't find anything related to
>>>>>>> fetchFailedException.
>>>>>>>
>>>>>>> as mentioned above
>>>>>>>
>>>>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>>>>> data and we see few data also missing , please explain why. We have a
>>>>>>> scenario when spark job complains FetchFailedException as one of the data
>>>>>>> nodes got rebooted in the middle of job running .
>>>>>>> Now due to this we have few duplicate data and few missing data .
>>>>>>> Why is spark not handling this scenario correctly ? kind of we shouldn't
>>>>>>> miss any data and we shouldn't create duplicate data . "
>>>>>>>
>>>>>>> We have to rerun the job again to fix this data quality issue .
>>>>>>> Please let me know why this case is not handled properly by Spark ?
>>>>>>>
>>>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <
>>>>>>> dongjoon.hyun@gmail.com> wrote:
>>>>>>>
>>>>>>>> Please use the url as thr full string including '()' part.
>>>>>>>>
>>>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and
>>>>>>>> three labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>>>>
>>>>>>>> Dongjoon
>>>>>>>>
>>>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Dongjoon,
>>>>>>>>> Thanks for emailing me.
>>>>>>>>> Could you please share a list of fixes  as the link provided by
>>>>>>>>> you is not working.
>>>>>>>>>
>>>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <
>>>>>>>>> dongjoon@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> If you are observing correctness issues, you may hit some old
>>>>>>>>>> (and fixed) correctness issues.
>>>>>>>>>>
>>>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>>>>> correctness issues.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>>>>
>>>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>>>>
>>>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>>>>
>>>>>>>>>> It would be help if you can report any correctness issues with
>>>>>>>>>> Apache Spark 3.5.1.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Dongjoon.
>>>>>>>>>>
>>>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>>>>> > When Spark job shows FetchFailedException it creates few
>>>>>>>>>> duplicate data and
>>>>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>>>>> scenario when
>>>>>>>>>> > spark job complains FetchFailedException as one of the data
>>>>>>>>>> node got
>>>>>>>>>> > rebooted middle of job running .
>>>>>>>>>> >
>>>>>>>>>> > Now due to this we have few duplicate data and few missing data
>>>>>>>>>> . Why spark
>>>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>>>>> miss any
>>>>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > I am using spark3.2.0 version.
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Prem Sahoo <pr...@gmail.com>.
super :(

On Mon, Mar 4, 2024 at 6:19 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> "... in a nutshell  if fetchFailedException occurs due to data node reboot
> then it  can create duplicate / missing data  .   so this is more of
> hardware(env issue ) rather than spark issue ."
>
> As an overall conclusion your point is correct but again the answer is not
> binary.
>
> Spark core relies on a distributed file system to store data across data
> nodes. When Spark needs to process data, it fetches the required blocks
> from the data nodes.* FetchFailedException*: means  that Spark
> encountered an error while fetching data blocks from a data node. If a data
> node reboots unexpectedly, it becomes unavailable to Spark for a
> period. During this time, Spark might attempt to fetch data blocks from the
> unavailable node, resulting in the FetchFailedException.. Depending on the
> timing and nature of the reboot and data access, this exception can lead
> to:the following:
>
>    - Duplicate Data: If Spark retries the fetch operation successfully
>    after the reboot, it might end up processing the same data twice, leading
>    to duplicates.
>    - Missing Data: If Spark cannot fetch all required data blocks due to
>    the unavailable data node, some data might be missing from the processing
>    results.
>
> The root cause of this issue lies in the data node reboot itself. So we
> can conclude that it is not a  problem with Spark core functionality but
> rather an environmental issue within the distributed storage systemL  You
> need to ensure that your nodes are stable and minimise unexpected reboots
> for whatever reason. Look at the host logs  or run /usr/bin/dmesg to see
> what happened..
>
> Good luck
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <pr...@gmail.com> wrote:
>
>> thanks Mich, in a nutshell  if fetchFailedException occurs due to data
>> node reboot then it  can create duplicate / missing data  .   so this is
>> more of hardware(env issue ) rather than spark issue .
>>
>>
>>
>> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> It seems to me that there are issues related to below
>>>
>>> *<Prem> I think when a task failed in between  and retry task started
>>> and completed it may create duplicate as failed task has some data + retry
>>> task has  full data.  but my question is why spark keeps delta data or
>>> according to you if speculative and original task completes generally spark
>>> kills one of the tasks to get rid of dups data.  when a data node is
>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>> why it has missing data.*
>>>
>>> Spark is designed to be fault-tolerant through lineage and
>>> recomputation. However, there are scenarios where speculative execution or
>>> task retries might lead to duplicated or missing data. So what are these?
>>>
>>> - Task Failure and Retry: You are correct that a failed task might have
>>> processed some data before encountering the FetchFailedException. If a
>>> retry succeeds, it would process the entire data partition again, leading
>>> to duplicates. When a task fails, Spark may recompute the lost data by
>>> recomputing the lost task on another node.  The output of the retried task
>>> is typically combined with the output of the original task during the final
>>> stage of the computation. This combination is done to handle scenarios
>>> where the original task partially completed and generated some output
>>> before failing. Spark does not intentionally store partially processed
>>> data. However, due to retries and speculative execution, duplicate
>>> processing can occur. To the best of my knowledge, Spark itself doesn't
>>> have a mechanism to identify and eliminate duplicates automatically. While
>>> Spark might sometimes kill speculative tasks if the original one finishes,
>>> it is not a guaranteed behavior. This depends on various factors like
>>> scheduling and task dependencies.
>>>
>>> - Speculative Execution: Spark supports speculative execution, where the
>>> same task is launched on multiple executors simultaneously. The result of
>>> the first completed task is used, and the others are usually killed to
>>> avoid duplicated results. However, speculative execution might introduce
>>> some duplication in the final output if tasks on different executors
>>> complete successfully.
>>>
>>> - Node Reboots and Fault Tolerance: If the data node reboot leads to
>>> data corruption or loss, that data might be unavailable to Spark. Even with
>>> fault tolerance, Spark cannot recover completely missing data. Fault
>>> tolerance focuses on recovering from issues like executor failures, not
>>> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
>>> handle executor failures by rescheduling tasks on other available executors
>>> and temporary network issues by retrying fetches based on configuration.
>>>
>>> Here are some stuff to consider:
>>>
>>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
>>> such as  1 or 2 to reduce the chance of duplicate processing attempts, if
>>> retries are suspected to be a source.
>>> - Disable speculative execution if needed: Consider disabling
>>> speculative execution (spark.speculation=false) if duplicates are a major
>>> concern. However, this might impact performance.
>>> - Data persistence: As mentioned in the previous reply, persist
>>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
>>> is critical. This ensures data availability even during node failures.
>>> - Data validation checks: Implement data validation checks after
>>> processing to identify potential duplicates or missing data.
>>> HTH
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <pr...@gmail.com> wrote:
>>>
>>>> Hello Mich,
>>>> thanks for your reply.
>>>>
>>>> As an engineer I can chip in. You may have partial execution and
>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>> node a few times before marking it permanently unavailable. However, if the
>>>> rebooted node recovers quickly within this retry window, some executors
>>>> might successfully fetch the data after a retry. *This leads to
>>>> duplicate processing of the same data partition*.
>>>>
>>>> <Prem> data node reboot is taking more than 20 mins and our config
>>>> spark.network.timeout=300s so we don't have dupls for the above reason.
>>>> I am not sure this one applies to your spark version but spark may
>>>> speculatively execute tasks on different executors to improve
>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>> speculative task might be launched on another executor. This is where fun
>>>> and games start. If the unavailable node recovers before the speculative
>>>> task finishes, both the original and speculative tasks might complete
>>>> successfully,* resulting in duplicates*. With regard to missing data,
>>>> if the data node reboot leads to data corruption or loss, some data
>>>> partitions might be completely unavailable. In this case, spark may skip
>>>> processing that missing data, leading to missing data in the final output.
>>>>
>>>> <Prem> I think when a task failed in between  and retry task started
>>>> and completed it may create duplicate as failed task has some data + retry
>>>> task has  full data.  but my question is why spark keeps delta data or
>>>> according to you if speculative and original task completes generally spark
>>>> kills one of the tasks to get rid of dups data.  when a data node is
>>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>>> why it has missing data.
>>>> Potential remedies: Spark offers some features to mitigate these
>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>> and *spark.speculation* to control retry attempts and speculative
>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>> some cases. You can consider persisting intermediate data results to a
>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>> processing overhead but can ensure data integrity.
>>>>
>>>> <Prem> How spark will handle these without a checkpoint as it will slow
>>>> down the process .  I have data loss or duplication is due to
>>>> fetchFailedException as a part of data node reboot.
>>>> I have few config to minimize fetchFailedException
>>>> spark.network.timeout=300s
>>>> spark.reducer.maxReqsInFlight=4
>>>> spark.shuffle.io.retryWait=30s
>>>> spark.shuffle.io.maxRetries=3
>>>>
>>>> When we get a fetchFailedException due to data node reboot then spark
>>>> should handle it gracefully isn't it ?
>>>> or how to handle it ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Your point -> "When Spark job shows FetchFailedException it creates
>>>>> few duplicate data and  we see few data also missing , please explain why.
>>>>> We have scenario when  spark job complains *FetchFailedException as
>>>>> one of the data node got ** rebooted middle of job running ."*
>>>>>
>>>>> As an engineer I can chip in. You may have partial execution and
>>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>>> node a few times before marking it permanently unavailable. However, if the
>>>>> rebooted node recovers quickly within this retry window, some executors
>>>>> might successfully fetch the data after a retry. *This leads to
>>>>> duplicate processing of the same data partition*.
>>>>>
>>>>> I am not sure this one applies to your spark version but spark may
>>>>> speculatively execute tasks on different executors to improve
>>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>>> speculative task might be launched on another executor. This is where fun
>>>>> and games start. If the unavailable node recovers before the speculative
>>>>> task finishes, both the original and speculative tasks might complete
>>>>> successfully,* resulting in duplicates*. With regard to missing data,
>>>>> if the data node reboot leads to data corruption or loss, some data
>>>>> partitions might be completely unavailable. In this case, spark may skip
>>>>> processing that missing data, leading to missing data in the final output.
>>>>>
>>>>> Potential remedies: Spark offers some features to mitigate these
>>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>>> and *spark.speculation* to control retry attempts and speculative
>>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>>> some cases. You can consider persisting intermediate data results to a
>>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>>> processing overhead but can ensure data integrity.
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com> wrote:
>>>>>
>>>>>> Hello All,
>>>>>> in the list of JIRAs i didn't find anything related to
>>>>>> fetchFailedException.
>>>>>>
>>>>>> as mentioned above
>>>>>>
>>>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>>>> data and we see few data also missing , please explain why. We have a
>>>>>> scenario when spark job complains FetchFailedException as one of the data
>>>>>> nodes got rebooted in the middle of job running .
>>>>>> Now due to this we have few duplicate data and few missing data . Why
>>>>>> is spark not handling this scenario correctly ? kind of we shouldn't miss
>>>>>> any data and we shouldn't create duplicate data . "
>>>>>>
>>>>>> We have to rerun the job again to fix this data quality issue .
>>>>>> Please let me know why this case is not handled properly by Spark ?
>>>>>>
>>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <
>>>>>> dongjoon.hyun@gmail.com> wrote:
>>>>>>
>>>>>>> Please use the url as thr full string including '()' part.
>>>>>>>
>>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>>>>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>>>
>>>>>>> Dongjoon
>>>>>>>
>>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Dongjoon,
>>>>>>>> Thanks for emailing me.
>>>>>>>> Could you please share a list of fixes  as the link provided by you
>>>>>>>> is not working.
>>>>>>>>
>>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> If you are observing correctness issues, you may hit some old (and
>>>>>>>>> fixed) correctness issues.
>>>>>>>>>
>>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>>>> correctness issues.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>>>
>>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>>>
>>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>>>
>>>>>>>>> It would be help if you can report any correctness issues with
>>>>>>>>> Apache Spark 3.5.1.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Dongjoon.
>>>>>>>>>
>>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>>>> > When Spark job shows FetchFailedException it creates few
>>>>>>>>> duplicate data and
>>>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>>>> scenario when
>>>>>>>>> > spark job complains FetchFailedException as one of the data node
>>>>>>>>> got
>>>>>>>>> > rebooted middle of job running .
>>>>>>>>> >
>>>>>>>>> > Now due to this we have few duplicate data and few missing data
>>>>>>>>> . Why spark
>>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>>>> miss any
>>>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > I am using spark3.2.0 version.
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>>>>
>>>>>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Mich Talebzadeh <mi...@gmail.com>.
"... in a nutshell  if fetchFailedException occurs due to data node reboot
then it  can create duplicate / missing data  .   so this is more of
hardware(env issue ) rather than spark issue ."

As an overall conclusion your point is correct but again the answer is not
binary.

Spark core relies on a distributed file system to store data across data
nodes. When Spark needs to process data, it fetches the required blocks
from the data nodes.* FetchFailedException*: means  that Spark encountered
an error while fetching data blocks from a data node. If a data node
reboots unexpectedly, it becomes unavailable to Spark for a period. During
this time, Spark might attempt to fetch data blocks from the unavailable
node, resulting in the FetchFailedException.. Depending on the timing and
nature of the reboot and data access, this exception can lead to:the
following:

   - Duplicate Data: If Spark retries the fetch operation successfully
   after the reboot, it might end up processing the same data twice, leading
   to duplicates.
   - Missing Data: If Spark cannot fetch all required data blocks due to
   the unavailable data node, some data might be missing from the processing
   results.

The root cause of this issue lies in the data node reboot itself. So we can
conclude that it is not a  problem with Spark core functionality but rather
an environmental issue within the distributed storage systemL  You need to
ensure that your nodes are stable and minimise unexpected reboots for
whatever reason. Look at the host logs  or run /usr/bin/dmesg to see what
happened..

Good luck

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Mon, 4 Mar 2024 at 01:30, Prem Sahoo <pr...@gmail.com> wrote:

> thanks Mich, in a nutshell  if fetchFailedException occurs due to data
> node reboot then it  can create duplicate / missing data  .   so this is
> more of hardware(env issue ) rather than spark issue .
>
>
>
> On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> It seems to me that there are issues related to below
>>
>> *<Prem> I think when a task failed in between  and retry task started and
>> completed it may create duplicate as failed task has some data + retry task
>> has  full data.  but my question is why spark keeps delta data or
>> according to you if speculative and original task completes generally spark
>> kills one of the tasks to get rid of dups data.  when a data node is
>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>> why it has missing data.*
>>
>> Spark is designed to be fault-tolerant through lineage and recomputation.
>> However, there are scenarios where speculative execution or task retries
>> might lead to duplicated or missing data. So what are these?
>>
>> - Task Failure and Retry: You are correct that a failed task might have
>> processed some data before encountering the FetchFailedException. If a
>> retry succeeds, it would process the entire data partition again, leading
>> to duplicates. When a task fails, Spark may recompute the lost data by
>> recomputing the lost task on another node.  The output of the retried task
>> is typically combined with the output of the original task during the final
>> stage of the computation. This combination is done to handle scenarios
>> where the original task partially completed and generated some output
>> before failing. Spark does not intentionally store partially processed
>> data. However, due to retries and speculative execution, duplicate
>> processing can occur. To the best of my knowledge, Spark itself doesn't
>> have a mechanism to identify and eliminate duplicates automatically. While
>> Spark might sometimes kill speculative tasks if the original one finishes,
>> it is not a guaranteed behavior. This depends on various factors like
>> scheduling and task dependencies.
>>
>> - Speculative Execution: Spark supports speculative execution, where the
>> same task is launched on multiple executors simultaneously. The result of
>> the first completed task is used, and the others are usually killed to
>> avoid duplicated results. However, speculative execution might introduce
>> some duplication in the final output if tasks on different executors
>> complete successfully.
>>
>> - Node Reboots and Fault Tolerance: If the data node reboot leads to data
>> corruption or loss, that data might be unavailable to Spark. Even with
>> fault tolerance, Spark cannot recover completely missing data. Fault
>> tolerance focuses on recovering from issues like executor failures, not
>> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
>> handle executor failures by rescheduling tasks on other available executors
>> and temporary network issues by retrying fetches based on configuration.
>>
>> Here are some stuff to consider:
>>
>> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
>> such as  1 or 2 to reduce the chance of duplicate processing attempts, if
>> retries are suspected to be a source.
>> - Disable speculative execution if needed: Consider disabling speculative
>> execution (spark.speculation=false) if duplicates are a major concern.
>> However, this might impact performance.
>> - Data persistence: As mentioned in the previous reply, persist
>> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
>> is critical. This ensures data availability even during node failures.
>> - Data validation checks: Implement data validation checks after
>> processing to identify potential duplicates or missing data.
>> HTH
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <pr...@gmail.com> wrote:
>>
>>> Hello Mich,
>>> thanks for your reply.
>>>
>>> As an engineer I can chip in. You may have partial execution and
>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>> may retry fetching the data from the unavailable (the one being rebooted)
>>> node a few times before marking it permanently unavailable. However, if the
>>> rebooted node recovers quickly within this retry window, some executors
>>> might successfully fetch the data after a retry. *This leads to
>>> duplicate processing of the same data partition*.
>>>
>>> <Prem> data node reboot is taking more than 20 mins and our config
>>> spark.network.timeout=300s so we don't have dupls for the above reason.
>>> I am not sure this one applies to your spark version but spark may
>>> speculatively execute tasks on different executors to improve
>>> performance. If a task fails due to the *FetchFailedException*, a
>>> speculative task might be launched on another executor. This is where fun
>>> and games start. If the unavailable node recovers before the speculative
>>> task finishes, both the original and speculative tasks might complete
>>> successfully,* resulting in duplicates*. With regard to missing data,
>>> if the data node reboot leads to data corruption or loss, some data
>>> partitions might be completely unavailable. In this case, spark may skip
>>> processing that missing data, leading to missing data in the final output.
>>>
>>> <Prem> I think when a task failed in between  and retry task started and
>>> completed it may create duplicate as failed task has some data + retry task
>>> has  full data.  but my question is why spark keeps delta data or
>>> according to you if speculative and original task completes generally spark
>>> kills one of the tasks to get rid of dups data.  when a data node is
>>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>>> why it has missing data.
>>> Potential remedies: Spark offers some features to mitigate these issues,
>>> but it might not guarantee complete elimination of duplicates or data
>>> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
>>> *spark.speculation* to control retry attempts and speculative execution
>>> behavior. Lineage tracking is there to help. Spark can track data lineage,
>>> allowing you to identify potentially corrupted or missing data in some
>>> cases. You can consider persisting intermediate data results to a reliable
>>> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
>>> case of node failures.  Your mileage varies as it adds additional
>>> processing overhead but can ensure data integrity.
>>>
>>> <Prem> How spark will handle these without a checkpoint as it will slow
>>> down the process .  I have data loss or duplication is due to
>>> fetchFailedException as a part of data node reboot.
>>> I have few config to minimize fetchFailedException
>>> spark.network.timeout=300s
>>> spark.reducer.maxReqsInFlight=4
>>> spark.shuffle.io.retryWait=30s
>>> spark.shuffle.io.maxRetries=3
>>>
>>> When we get a fetchFailedException due to data node reboot then spark
>>> should handle it gracefully isn't it ?
>>> or how to handle it ?
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Your point -> "When Spark job shows FetchFailedException it creates
>>>> few duplicate data and  we see few data also missing , please explain why.
>>>> We have scenario when  spark job complains *FetchFailedException as
>>>> one of the data node got ** rebooted middle of job running ."*
>>>>
>>>> As an engineer I can chip in. You may have partial execution and
>>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>>> may retry fetching the data from the unavailable (the one being rebooted)
>>>> node a few times before marking it permanently unavailable. However, if the
>>>> rebooted node recovers quickly within this retry window, some executors
>>>> might successfully fetch the data after a retry. *This leads to
>>>> duplicate processing of the same data partition*.
>>>>
>>>> I am not sure this one applies to your spark version but spark may
>>>> speculatively execute tasks on different executors to improve
>>>> performance. If a task fails due to the *FetchFailedException*, a
>>>> speculative task might be launched on another executor. This is where fun
>>>> and games start. If the unavailable node recovers before the speculative
>>>> task finishes, both the original and speculative tasks might complete
>>>> successfully,* resulting in duplicates*. With regard to missing data,
>>>> if the data node reboot leads to data corruption or loss, some data
>>>> partitions might be completely unavailable. In this case, spark may skip
>>>> processing that missing data, leading to missing data in the final output.
>>>>
>>>> Potential remedies: Spark offers some features to mitigate these
>>>> issues, but it might not guarantee complete elimination of duplicates or
>>>> data loss:. You can adjust parameters like *spark.shuffle.retry.wa*it
>>>> and *spark.speculation* to control retry attempts and speculative
>>>> execution behavior. Lineage tracking is there to help. Spark can track data
>>>> lineage, allowing you to identify potentially corrupted or missing data in
>>>> some cases. You can consider persisting intermediate data results to a
>>>> reliable storage (like HDFS or GCS or another cloud storage) to avoid data
>>>> loss in case of node failures.  Your mileage varies as it adds additional
>>>> processing overhead but can ensure data integrity.
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com> wrote:
>>>>
>>>>> Hello All,
>>>>> in the list of JIRAs i didn't find anything related to
>>>>> fetchFailedException.
>>>>>
>>>>> as mentioned above
>>>>>
>>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>>> data and we see few data also missing , please explain why. We have a
>>>>> scenario when spark job complains FetchFailedException as one of the data
>>>>> nodes got rebooted in the middle of job running .
>>>>> Now due to this we have few duplicate data and few missing data . Why
>>>>> is spark not handling this scenario correctly ? kind of we shouldn't miss
>>>>> any data and we shouldn't create duplicate data . "
>>>>>
>>>>> We have to rerun the job again to fix this data quality issue . Please
>>>>> let me know why this case is not handled properly by Spark ?
>>>>>
>>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <do...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Please use the url as thr full string including '()' part.
>>>>>>
>>>>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>>>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>>
>>>>>> Dongjoon
>>>>>>
>>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Dongjoon,
>>>>>>> Thanks for emailing me.
>>>>>>> Could you please share a list of fixes  as the link provided by you
>>>>>>> is not working.
>>>>>>>
>>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> If you are observing correctness issues, you may hit some old (and
>>>>>>>> fixed) correctness issues.
>>>>>>>>
>>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>>> correctness issues.
>>>>>>>>
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>>
>>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>>
>>>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>>
>>>>>>>> It would be help if you can report any correctness issues with
>>>>>>>> Apache Spark 3.5.1.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dongjoon.
>>>>>>>>
>>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>>> > When Spark job shows FetchFailedException it creates few
>>>>>>>> duplicate data and
>>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>>> scenario when
>>>>>>>> > spark job complains FetchFailedException as one of the data node
>>>>>>>> got
>>>>>>>> > rebooted middle of job running .
>>>>>>>> >
>>>>>>>> > Now due to this we have few duplicate data and few missing data .
>>>>>>>> Why spark
>>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>>> miss any
>>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > I am using spark3.2.0 version.
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>>>
>>>>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Prem Sahoo <pr...@gmail.com>.
thanks Mich, in a nutshell  if fetchFailedException occurs due to data node
reboot then it  can create duplicate / missing data  .   so this is more of
hardware(env issue ) rather than spark issue .



On Sat, Mar 2, 2024 at 7:45 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
> It seems to me that there are issues related to below
>
> *<Prem> I think when a task failed in between  and retry task started and
> completed it may create duplicate as failed task has some data + retry task
> has  full data.  but my question is why spark keeps delta data or
> according to you if speculative and original task completes generally spark
> kills one of the tasks to get rid of dups data.  when a data node is
> rebooted then spark fault tolerant should go to other nodes isn't it ? then
> why it has missing data.*
>
> Spark is designed to be fault-tolerant through lineage and recomputation.
> However, there are scenarios where speculative execution or task retries
> might lead to duplicated or missing data. So what are these?
>
> - Task Failure and Retry: You are correct that a failed task might have
> processed some data before encountering the FetchFailedException. If a
> retry succeeds, it would process the entire data partition again, leading
> to duplicates. When a task fails, Spark may recompute the lost data by
> recomputing the lost task on another node.  The output of the retried task
> is typically combined with the output of the original task during the final
> stage of the computation. This combination is done to handle scenarios
> where the original task partially completed and generated some output
> before failing. Spark does not intentionally store partially processed
> data. However, due to retries and speculative execution, duplicate
> processing can occur. To the best of my knowledge, Spark itself doesn't
> have a mechanism to identify and eliminate duplicates automatically. While
> Spark might sometimes kill speculative tasks if the original one finishes,
> it is not a guaranteed behavior. This depends on various factors like
> scheduling and task dependencies.
>
> - Speculative Execution: Spark supports speculative execution, where the
> same task is launched on multiple executors simultaneously. The result of
> the first completed task is used, and the others are usually killed to
> avoid duplicated results. However, speculative execution might introduce
> some duplication in the final output if tasks on different executors
> complete successfully.
>
> - Node Reboots and Fault Tolerance: If the data node reboot leads to data
> corruption or loss, that data might be unavailable to Spark. Even with
> fault tolerance, Spark cannot recover completely missing data. Fault
> tolerance focuses on recovering from issues like executor failures, not
> data loss on storage nodes. Overall, Spark's fault tolerance is designed to
> handle executor failures by rescheduling tasks on other available executors
> and temporary network issues by retrying fetches based on configuration.
>
> Here are some stuff to consider:
>
> - Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
> such as  1 or 2 to reduce the chance of duplicate processing attempts, if
> retries are suspected to be a source.
> - Disable speculative execution if needed: Consider disabling speculative
> execution (spark.speculation=false) if duplicates are a major concern.
> However, this might impact performance.
> - Data persistence: As mentioned in the previous reply, persist
> intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
> is critical. This ensures data availability even during node failures.
> - Data validation checks: Implement data validation checks after
> processing to identify potential duplicates or missing data.
> HTH
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <pr...@gmail.com> wrote:
>
>> Hello Mich,
>> thanks for your reply.
>>
>> As an engineer I can chip in. You may have partial execution and retries
>> meaning when spark encounters a *FetchFailedException*, it  may retry
>> fetching the data from the unavailable (the one being rebooted) node a few
>> times before marking it permanently unavailable. However, if the rebooted
>> node recovers quickly within this retry window, some executors might
>> successfully fetch the data after a retry. *This leads to duplicate
>> processing of the same data partition*.
>>
>> <Prem> data node reboot is taking more than 20 mins and our config
>> spark.network.timeout=300s so we don't have dupls for the above reason.
>> I am not sure this one applies to your spark version but spark may
>> speculatively execute tasks on different executors to improve
>> performance. If a task fails due to the *FetchFailedException*, a
>> speculative task might be launched on another executor. This is where fun
>> and games start. If the unavailable node recovers before the speculative
>> task finishes, both the original and speculative tasks might complete
>> successfully,* resulting in duplicates*. With regard to missing data, if
>> the data node reboot leads to data corruption or loss, some data partitions
>> might be completely unavailable. In this case, spark may skip processing
>> that missing data, leading to missing data in the final output.
>>
>> <Prem> I think when a task failed in between  and retry task started and
>> completed it may create duplicate as failed task has some data + retry task
>> has  full data.  but my question is why spark keeps delta data or
>> according to you if speculative and original task completes generally spark
>> kills one of the tasks to get rid of dups data.  when a data node is
>> rebooted then spark fault tolerant should go to other nodes isn't it ? then
>> why it has missing data.
>> Potential remedies: Spark offers some features to mitigate these issues,
>> but it might not guarantee complete elimination of duplicates or data
>> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
>> *spark.speculation* to control retry attempts and speculative execution
>> behavior. Lineage tracking is there to help. Spark can track data lineage,
>> allowing you to identify potentially corrupted or missing data in some
>> cases. You can consider persisting intermediate data results to a reliable
>> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
>> case of node failures.  Your mileage varies as it adds additional
>> processing overhead but can ensure data integrity.
>>
>> <Prem> How spark will handle these without a checkpoint as it will slow
>> down the process .  I have data loss or duplication is due to
>> fetchFailedException as a part of data node reboot.
>> I have few config to minimize fetchFailedException
>> spark.network.timeout=300s
>> spark.reducer.maxReqsInFlight=4
>> spark.shuffle.io.retryWait=30s
>> spark.shuffle.io.maxRetries=3
>>
>> When we get a fetchFailedException due to data node reboot then spark
>> should handle it gracefully isn't it ?
>> or how to handle it ?
>>
>>
>>
>>
>>
>> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Your point -> "When Spark job shows FetchFailedException it creates few
>>> duplicate data and  we see few data also missing , please explain why. We
>>> have scenario when  spark job complains *FetchFailedException as one of
>>> the data node got ** rebooted middle of job running ."*
>>>
>>> As an engineer I can chip in. You may have partial execution and
>>> retries  meaning when spark encounters a *FetchFailedException*, it
>>> may retry fetching the data from the unavailable (the one being rebooted)
>>> node a few times before marking it permanently unavailable. However, if the
>>> rebooted node recovers quickly within this retry window, some executors
>>> might successfully fetch the data after a retry. *This leads to
>>> duplicate processing of the same data partition*.
>>>
>>> I am not sure this one applies to your spark version but spark may
>>> speculatively execute tasks on different executors to improve
>>> performance. If a task fails due to the *FetchFailedException*, a
>>> speculative task might be launched on another executor. This is where fun
>>> and games start. If the unavailable node recovers before the speculative
>>> task finishes, both the original and speculative tasks might complete
>>> successfully,* resulting in duplicates*. With regard to missing data,
>>> if the data node reboot leads to data corruption or loss, some data
>>> partitions might be completely unavailable. In this case, spark may skip
>>> processing that missing data, leading to missing data in the final output.
>>>
>>> Potential remedies: Spark offers some features to mitigate these issues,
>>> but it might not guarantee complete elimination of duplicates or data
>>> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
>>> *spark.speculation* to control retry attempts and speculative execution
>>> behavior. Lineage tracking is there to help. Spark can track data lineage,
>>> allowing you to identify potentially corrupted or missing data in some
>>> cases. You can consider persisting intermediate data results to a reliable
>>> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
>>> case of node failures.  Your mileage varies as it adds additional
>>> processing overhead but can ensure data integrity.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>> in the list of JIRAs i didn't find anything related to
>>>> fetchFailedException.
>>>>
>>>> as mentioned above
>>>>
>>>> "When Spark job shows FetchFailedException it creates few duplicate
>>>> data and we see few data also missing , please explain why. We have a
>>>> scenario when spark job complains FetchFailedException as one of the data
>>>> nodes got rebooted in the middle of job running .
>>>> Now due to this we have few duplicate data and few missing data . Why
>>>> is spark not handling this scenario correctly ? kind of we shouldn't miss
>>>> any data and we shouldn't create duplicate data . "
>>>>
>>>> We have to rerun the job again to fix this data quality issue . Please
>>>> let me know why this case is not handled properly by Spark ?
>>>>
>>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <do...@gmail.com>
>>>> wrote:
>>>>
>>>>> Please use the url as thr full string including '()' part.
>>>>>
>>>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>>
>>>>> Dongjoon
>>>>>
>>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com> wrote:
>>>>>
>>>>>> Hello Dongjoon,
>>>>>> Thanks for emailing me.
>>>>>> Could you please share a list of fixes  as the link provided by you
>>>>>> is not working.
>>>>>>
>>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> If you are observing correctness issues, you may hit some old (and
>>>>>>> fixed) correctness issues.
>>>>>>>
>>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>>> correctness issues.
>>>>>>>
>>>>>>>
>>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>>
>>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>>
>>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>>
>>>>>>> It would be help if you can report any correctness issues with
>>>>>>> Apache Spark 3.5.1.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dongjoon.
>>>>>>>
>>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>>> > When Spark job shows FetchFailedException it creates few duplicate
>>>>>>> data and
>>>>>>> > we see few data also missing , please explain why. We have
>>>>>>> scenario when
>>>>>>> > spark job complains FetchFailedException as one of the data node
>>>>>>> got
>>>>>>> > rebooted middle of job running .
>>>>>>> >
>>>>>>> > Now due to this we have few duplicate data and few missing data .
>>>>>>> Why spark
>>>>>>> > is not handling this scenario correctly ? kind of we shouldn't
>>>>>>> miss any
>>>>>>> > data and we shouldn't create duplicate data .
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > I am using spark3.2.0 version.
>>>>>>> >
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>>
>>>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

It seems to me that there are issues related to below

*<Prem> I think when a task failed in between  and retry task started and
completed it may create duplicate as failed task has some data + retry task
has  full data.  but my question is why spark keeps delta data or
according to you if speculative and original task completes generally spark
kills one of the tasks to get rid of dups data.  when a data node is
rebooted then spark fault tolerant should go to other nodes isn't it ? then
why it has missing data.*

Spark is designed to be fault-tolerant through lineage and recomputation.
However, there are scenarios where speculative execution or task retries
might lead to duplicated or missing data. So what are these?

- Task Failure and Retry: You are correct that a failed task might have
processed some data before encountering the FetchFailedException. If a
retry succeeds, it would process the entire data partition again, leading
to duplicates. When a task fails, Spark may recompute the lost data by
recomputing the lost task on another node.  The output of the retried task
is typically combined with the output of the original task during the final
stage of the computation. This combination is done to handle scenarios
where the original task partially completed and generated some output
before failing. Spark does not intentionally store partially processed
data. However, due to retries and speculative execution, duplicate
processing can occur. To the best of my knowledge, Spark itself doesn't
have a mechanism to identify and eliminate duplicates automatically. While
Spark might sometimes kill speculative tasks if the original one finishes,
it is not a guaranteed behavior. This depends on various factors like
scheduling and task dependencies.

- Speculative Execution: Spark supports speculative execution, where the
same task is launched on multiple executors simultaneously. The result of
the first completed task is used, and the others are usually killed to
avoid duplicated results. However, speculative execution might introduce
some duplication in the final output if tasks on different executors
complete successfully.

- Node Reboots and Fault Tolerance: If the data node reboot leads to data
corruption or loss, that data might be unavailable to Spark. Even with
fault tolerance, Spark cannot recover completely missing data. Fault
tolerance focuses on recovering from issues like executor failures, not
data loss on storage nodes. Overall, Spark's fault tolerance is designed to
handle executor failures by rescheduling tasks on other available executors
and temporary network issues by retrying fetches based on configuration.

Here are some stuff to consider:

- Minimize retries: Adjust spark.shuffle.io.maxRetries to a lower value
such as  1 or 2 to reduce the chance of duplicate processing attempts, if
retries are suspected to be a source.
- Disable speculative execution if needed: Consider disabling speculative
execution (spark.speculation=false) if duplicates are a major concern.
However, this might impact performance.
- Data persistence: As mentioned in the previous reply, persist
intermediate data to reliable storage (HDFS, GCS, etc.) if data integrity
is critical. This ensures data availability even during node failures.
- Data validation checks: Implement data validation checks after processing
to identify potential duplicates or missing data.
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Sat, 2 Mar 2024 at 01:43, Prem Sahoo <pr...@gmail.com> wrote:

> Hello Mich,
> thanks for your reply.
>
> As an engineer I can chip in. You may have partial execution and retries
> meaning when spark encounters a *FetchFailedException*, it  may retry
> fetching the data from the unavailable (the one being rebooted) node a few
> times before marking it permanently unavailable. However, if the rebooted
> node recovers quickly within this retry window, some executors might
> successfully fetch the data after a retry. *This leads to duplicate
> processing of the same data partition*.
>
> <Prem> data node reboot is taking more than 20 mins and our config
> spark.network.timeout=300s so we don't have dupls for the above reason.
> I am not sure this one applies to your spark version but spark may
> speculatively execute tasks on different executors to improve
> performance. If a task fails due to the *FetchFailedException*, a
> speculative task might be launched on another executor. This is where fun
> and games start. If the unavailable node recovers before the speculative
> task finishes, both the original and speculative tasks might complete
> successfully,* resulting in duplicates*. With regard to missing data, if
> the data node reboot leads to data corruption or loss, some data partitions
> might be completely unavailable. In this case, spark may skip processing
> that missing data, leading to missing data in the final output.
>
> <Prem> I think when a task failed in between  and retry task started and
> completed it may create duplicate as failed task has some data + retry task
> has  full data.  but my question is why spark keeps delta data or
> according to you if speculative and original task completes generally spark
> kills one of the tasks to get rid of dups data.  when a data node is
> rebooted then spark fault tolerant should go to other nodes isn't it ? then
> why it has missing data.
> Potential remedies: Spark offers some features to mitigate these issues,
> but it might not guarantee complete elimination of duplicates or data
> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
> *spark.speculation* to control retry attempts and speculative execution
> behavior. Lineage tracking is there to help. Spark can track data lineage,
> allowing you to identify potentially corrupted or missing data in some
> cases. You can consider persisting intermediate data results to a reliable
> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
> case of node failures.  Your mileage varies as it adds additional
> processing overhead but can ensure data integrity.
>
> <Prem> How spark will handle these without a checkpoint as it will slow
> down the process .  I have data loss or duplication is due to
> fetchFailedException as a part of data node reboot.
> I have few config to minimize fetchFailedException
> spark.network.timeout=300s
> spark.reducer.maxReqsInFlight=4
> spark.shuffle.io.retryWait=30s
> spark.shuffle.io.maxRetries=3
>
> When we get a fetchFailedException due to data node reboot then spark
> should handle it gracefully isn't it ?
> or how to handle it ?
>
>
>
>
>
> On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Your point -> "When Spark job shows FetchFailedException it creates few
>> duplicate data and  we see few data also missing , please explain why. We
>> have scenario when  spark job complains *FetchFailedException as one of
>> the data node got ** rebooted middle of job running ."*
>>
>> As an engineer I can chip in. You may have partial execution and retries
>> meaning when spark encounters a *FetchFailedException*, it  may retry
>> fetching the data from the unavailable (the one being rebooted) node a few
>> times before marking it permanently unavailable. However, if the rebooted
>> node recovers quickly within this retry window, some executors might
>> successfully fetch the data after a retry. *This leads to duplicate
>> processing of the same data partition*.
>>
>> I am not sure this one applies to your spark version but spark may
>> speculatively execute tasks on different executors to improve
>> performance. If a task fails due to the *FetchFailedException*, a
>> speculative task might be launched on another executor. This is where fun
>> and games start. If the unavailable node recovers before the speculative
>> task finishes, both the original and speculative tasks might complete
>> successfully,* resulting in duplicates*. With regard to missing data, if
>> the data node reboot leads to data corruption or loss, some data partitions
>> might be completely unavailable. In this case, spark may skip processing
>> that missing data, leading to missing data in the final output.
>>
>> Potential remedies: Spark offers some features to mitigate these issues,
>> but it might not guarantee complete elimination of duplicates or data
>> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
>> *spark.speculation* to control retry attempts and speculative execution
>> behavior. Lineage tracking is there to help. Spark can track data lineage,
>> allowing you to identify potentially corrupted or missing data in some
>> cases. You can consider persisting intermediate data results to a reliable
>> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
>> case of node failures.  Your mileage varies as it adds additional
>> processing overhead but can ensure data integrity.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com> wrote:
>>
>>> Hello All,
>>> in the list of JIRAs i didn't find anything related to
>>> fetchFailedException.
>>>
>>> as mentioned above
>>>
>>> "When Spark job shows FetchFailedException it creates few duplicate data
>>> and we see few data also missing , please explain why. We have a scenario
>>> when spark job complains FetchFailedException as one of the data nodes got
>>> rebooted in the middle of job running .
>>> Now due to this we have few duplicate data and few missing data . Why is
>>> spark not handling this scenario correctly ? kind of we shouldn't miss any
>>> data and we shouldn't create duplicate data . "
>>>
>>> We have to rerun the job again to fix this data quality issue . Please
>>> let me know why this case is not handled properly by Spark ?
>>>
>>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <do...@gmail.com>
>>> wrote:
>>>
>>>> Please use the url as thr full string including '()' part.
>>>>
>>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>>
>>>> Dongjoon
>>>>
>>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com> wrote:
>>>>
>>>>> Hello Dongjoon,
>>>>> Thanks for emailing me.
>>>>> Could you please share a list of fixes  as the link provided by you is
>>>>> not working.
>>>>>
>>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> If you are observing correctness issues, you may hit some old (and
>>>>>> fixed) correctness issues.
>>>>>>
>>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31
>>>>>> correctness issues.
>>>>>>
>>>>>>
>>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>>
>>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>>
>>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>>
>>>>>> It would be help if you can report any correctness issues with Apache
>>>>>> Spark 3.5.1.
>>>>>>
>>>>>> Thanks,
>>>>>> Dongjoon.
>>>>>>
>>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>>> > When Spark job shows FetchFailedException it creates few duplicate
>>>>>> data and
>>>>>> > we see few data also missing , please explain why. We have scenario
>>>>>> when
>>>>>> > spark job complains FetchFailedException as one of the data node got
>>>>>> > rebooted middle of job running .
>>>>>> >
>>>>>> > Now due to this we have few duplicate data and few missing data .
>>>>>> Why spark
>>>>>> > is not handling this scenario correctly ? kind of we shouldn't miss
>>>>>> any
>>>>>> > data and we shouldn't create duplicate data .
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > I am using spark3.2.0 version.
>>>>>> >
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>
>>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Prem Sahoo <pr...@gmail.com>.
Hello Mich,
thanks for your reply.

As an engineer I can chip in. You may have partial execution and retries
meaning when spark encounters a *FetchFailedException*, it  may retry
fetching the data from the unavailable (the one being rebooted) node a few
times before marking it permanently unavailable. However, if the rebooted
node recovers quickly within this retry window, some executors might
successfully fetch the data after a retry. *This leads to duplicate
processing of the same data partition*.

<Prem> data node reboot is taking more than 20 mins and our config
spark.network.timeout=300s so we don't have dupls for the above reason.
I am not sure this one applies to your spark version but spark may
speculatively execute tasks on different executors to improve
performance. If a task fails due to the *FetchFailedException*, a
speculative task might be launched on another executor. This is where fun
and games start. If the unavailable node recovers before the speculative
task finishes, both the original and speculative tasks might complete
successfully,* resulting in duplicates*. With regard to missing data, if
the data node reboot leads to data corruption or loss, some data partitions
might be completely unavailable. In this case, spark may skip processing
that missing data, leading to missing data in the final output.

<Prem> I think when a task failed in between  and retry task started and
completed it may create duplicate as failed task has some data + retry task
has  full data.  but my question is why spark keeps delta data or
according to you if speculative and original task completes generally spark
kills one of the tasks to get rid of dups data.  when a data node is
rebooted then spark fault tolerant should go to other nodes isn't it ? then
why it has missing data.
Potential remedies: Spark offers some features to mitigate these issues,
but it might not guarantee complete elimination of duplicates or data
loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
*spark.speculation* to control retry attempts and speculative execution
behavior. Lineage tracking is there to help. Spark can track data lineage,
allowing you to identify potentially corrupted or missing data in some
cases. You can consider persisting intermediate data results to a reliable
storage (like HDFS or GCS or another cloud storage) to avoid data loss in
case of node failures.  Your mileage varies as it adds additional
processing overhead but can ensure data integrity.

<Prem> How spark will handle these without a checkpoint as it will slow
down the process .  I have data loss or duplication is due to
fetchFailedException as a part of data node reboot.
I have few config to minimize fetchFailedException
spark.network.timeout=300s
spark.reducer.maxReqsInFlight=4
spark.shuffle.io.retryWait=30s
spark.shuffle.io.maxRetries=3

When we get a fetchFailedException due to data node reboot then spark
should handle it gracefully isn't it ?
or how to handle it ?





On Fri, Mar 1, 2024 at 5:35 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
> Your point -> "When Spark job shows FetchFailedException it creates few
> duplicate data and  we see few data also missing , please explain why. We
> have scenario when  spark job complains *FetchFailedException as one of
> the data node got ** rebooted middle of job running ."*
>
> As an engineer I can chip in. You may have partial execution and retries
> meaning when spark encounters a *FetchFailedException*, it  may retry
> fetching the data from the unavailable (the one being rebooted) node a few
> times before marking it permanently unavailable. However, if the rebooted
> node recovers quickly within this retry window, some executors might
> successfully fetch the data after a retry. *This leads to duplicate
> processing of the same data partition*.
>
> I am not sure this one applies to your spark version but spark may
> speculatively execute tasks on different executors to improve
> performance. If a task fails due to the *FetchFailedException*, a
> speculative task might be launched on another executor. This is where fun
> and games start. If the unavailable node recovers before the speculative
> task finishes, both the original and speculative tasks might complete
> successfully,* resulting in duplicates*. With regard to missing data, if
> the data node reboot leads to data corruption or loss, some data partitions
> might be completely unavailable. In this case, spark may skip processing
> that missing data, leading to missing data in the final output.
>
> Potential remedies: Spark offers some features to mitigate these issues,
> but it might not guarantee complete elimination of duplicates or data
> loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
> *spark.speculation* to control retry attempts and speculative execution
> behavior. Lineage tracking is there to help. Spark can track data lineage,
> allowing you to identify potentially corrupted or missing data in some
> cases. You can consider persisting intermediate data results to a reliable
> storage (like HDFS or GCS or another cloud storage) to avoid data loss in
> case of node failures.  Your mileage varies as it adds additional
> processing overhead but can ensure data integrity.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com> wrote:
>
>> Hello All,
>> in the list of JIRAs i didn't find anything related to
>> fetchFailedException.
>>
>> as mentioned above
>>
>> "When Spark job shows FetchFailedException it creates few duplicate data
>> and we see few data also missing , please explain why. We have a scenario
>> when spark job complains FetchFailedException as one of the data nodes got
>> rebooted in the middle of job running .
>> Now due to this we have few duplicate data and few missing data . Why is
>> spark not handling this scenario correctly ? kind of we shouldn't miss any
>> data and we shouldn't create duplicate data . "
>>
>> We have to rerun the job again to fix this data quality issue . Please
>> let me know why this case is not handled properly by Spark ?
>>
>> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <do...@gmail.com>
>> wrote:
>>
>>> Please use the url as thr full string including '()' part.
>>>
>>> Or you can seach directly at ASF Jira with 'Spark' project and three
>>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>>
>>> Dongjoon
>>>
>>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com> wrote:
>>>
>>>> Hello Dongjoon,
>>>> Thanks for emailing me.
>>>> Could you please share a list of fixes  as the link provided by you is
>>>> not working.
>>>>
>>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> If you are observing correctness issues, you may hit some old (and
>>>>> fixed) correctness issues.
>>>>>
>>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 correctness
>>>>> issues.
>>>>>
>>>>>
>>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>>
>>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>>
>>>>> Please use the latest version, Apache Spark 3.5.1, because Apache
>>>>> Spark 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>>
>>>>> It would be help if you can report any correctness issues with Apache
>>>>> Spark 3.5.1.
>>>>>
>>>>> Thanks,
>>>>> Dongjoon.
>>>>>
>>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>>> > When Spark job shows FetchFailedException it creates few duplicate
>>>>> data and
>>>>> > we see few data also missing , please explain why. We have scenario
>>>>> when
>>>>> > spark job complains FetchFailedException as one of the data node got
>>>>> > rebooted middle of job running .
>>>>> >
>>>>> > Now due to this we have few duplicate data and few missing data .
>>>>> Why spark
>>>>> > is not handling this scenario correctly ? kind of we shouldn't miss
>>>>> any
>>>>> > data and we shouldn't create duplicate data .
>>>>> >
>>>>> >
>>>>> >
>>>>> > I am using spark3.2.0 version.
>>>>> >
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>
>>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

Your point -> "When Spark job shows FetchFailedException it creates few
duplicate data and  we see few data also missing , please explain why. We
have scenario when  spark job complains *FetchFailedException as one of the
data node got ** rebooted middle of job running ."*

As an engineer I can chip in. You may have partial execution and retries
meaning when spark encounters a *FetchFailedException*, it  may retry
fetching the data from the unavailable (the one being rebooted) node a few
times before marking it permanently unavailable. However, if the rebooted
node recovers quickly within this retry window, some executors might
successfully fetch the data after a retry. *This leads to duplicate
processing of the same data partition*.

I am not sure this one applies to your spark version but spark may
speculatively execute tasks on different executors to improve
performance. If a task fails due to the *FetchFailedException*, a
speculative task might be launched on another executor. This is where fun
and games start. If the unavailable node recovers before the speculative
task finishes, both the original and speculative tasks might complete
successfully,* resulting in duplicates*. With regard to missing data, if
the data node reboot leads to data corruption or loss, some data partitions
might be completely unavailable. In this case, spark may skip processing
that missing data, leading to missing data in the final output.

Potential remedies: Spark offers some features to mitigate these issues,
but it might not guarantee complete elimination of duplicates or data
loss:. You can adjust parameters like *spark.shuffle.retry.wa*it and
*spark.speculation* to control retry attempts and speculative execution
behavior. Lineage tracking is there to help. Spark can track data lineage,
allowing you to identify potentially corrupted or missing data in some
cases. You can consider persisting intermediate data results to a reliable
storage (like HDFS or GCS or another cloud storage) to avoid data loss in
case of node failures.  Your mileage varies as it adds additional
processing overhead but can ensure data integrity.

HTH

Mich Talebzadeh,
Dad | Technologist
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 1 Mar 2024 at 20:56, Prem Sahoo <pr...@gmail.com> wrote:

> Hello All,
> in the list of JIRAs i didn't find anything related to
> fetchFailedException.
>
> as mentioned above
>
> "When Spark job shows FetchFailedException it creates few duplicate data
> and we see few data also missing , please explain why. We have a scenario
> when spark job complains FetchFailedException as one of the data nodes got
> rebooted in the middle of job running .
> Now due to this we have few duplicate data and few missing data . Why is
> spark not handling this scenario correctly ? kind of we shouldn't miss any
> data and we shouldn't create duplicate data . "
>
> We have to rerun the job again to fix this data quality issue . Please let
> me know why this case is not handled properly by Spark ?
>
> On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <do...@gmail.com>
> wrote:
>
>> Please use the url as thr full string including '()' part.
>>
>> Or you can seach directly at ASF Jira with 'Spark' project and three
>> labels, 'Correctness', 'correctness' and 'data-loss'.
>>
>> Dongjoon
>>
>> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com> wrote:
>>
>>> Hello Dongjoon,
>>> Thanks for emailing me.
>>> Could you please share a list of fixes  as the link provided by you is
>>> not working.
>>>
>>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> If you are observing correctness issues, you may hit some old (and
>>>> fixed) correctness issues.
>>>>
>>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 correctness
>>>> issues.
>>>>
>>>>
>>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>>
>>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>>
>>>> Please use the latest version, Apache Spark 3.5.1, because Apache Spark
>>>> 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>>
>>>> It would be help if you can report any correctness issues with Apache
>>>> Spark 3.5.1.
>>>>
>>>> Thanks,
>>>> Dongjoon.
>>>>
>>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>>> > When Spark job shows FetchFailedException it creates few duplicate
>>>> data and
>>>> > we see few data also missing , please explain why. We have scenario
>>>> when
>>>> > spark job complains FetchFailedException as one of the data node got
>>>> > rebooted middle of job running .
>>>> >
>>>> > Now due to this we have few duplicate data and few missing data . Why
>>>> spark
>>>> > is not handling this scenario correctly ? kind of we shouldn't miss
>>>> any
>>>> > data and we shouldn't create duplicate data .
>>>> >
>>>> >
>>>> >
>>>> > I am using spark3.2.0 version.
>>>> >
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>
>>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Prem Sahoo <pr...@gmail.com>.
Hello All,
in the list of JIRAs i didn't find anything related to fetchFailedException.

as mentioned above

"When Spark job shows FetchFailedException it creates few duplicate data
and we see few data also missing , please explain why. We have a scenario
when spark job complains FetchFailedException as one of the data nodes got
rebooted in the middle of job running .
Now due to this we have few duplicate data and few missing data . Why is
spark not handling this scenario correctly ? kind of we shouldn't miss any
data and we shouldn't create duplicate data . "

We have to rerun the job again to fix this data quality issue . Please let
me know why this case is not handled properly by Spark ?

On Thu, Feb 29, 2024 at 9:50 PM Dongjoon Hyun <do...@gmail.com>
wrote:

> Please use the url as thr full string including '()' part.
>
> Or you can seach directly at ASF Jira with 'Spark' project and three
> labels, 'Correctness', 'correctness' and 'data-loss'.
>
> Dongjoon
>
> On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com> wrote:
>
>> Hello Dongjoon,
>> Thanks for emailing me.
>> Could you please share a list of fixes  as the link provided by you is
>> not working.
>>
>> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> If you are observing correctness issues, you may hit some old (and
>>> fixed) correctness issues.
>>>
>>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 correctness
>>> issues.
>>>
>>>
>>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>>
>>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>>
>>> Please use the latest version, Apache Spark 3.5.1, because Apache Spark
>>> 3.2 and 3.3 are in the End-Of-Support status of the community.
>>>
>>> It would be help if you can report any correctness issues with Apache
>>> Spark 3.5.1.
>>>
>>> Thanks,
>>> Dongjoon.
>>>
>>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>>> > When Spark job shows FetchFailedException it creates few duplicate
>>> data and
>>> > we see few data also missing , please explain why. We have scenario
>>> when
>>> > spark job complains FetchFailedException as one of the data node got
>>> > rebooted middle of job running .
>>> >
>>> > Now due to this we have few duplicate data and few missing data . Why
>>> spark
>>> > is not handling this scenario correctly ? kind of we shouldn't miss any
>>> > data and we shouldn't create duplicate data .
>>> >
>>> >
>>> >
>>> > I am using spark3.2.0 version.
>>> >
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>
>>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Dongjoon Hyun <do...@gmail.com>.
Please use the url as thr full string including '()' part.

Or you can seach directly at ASF Jira with 'Spark' project and three
labels, 'Correctness', 'correctness' and 'data-loss'.

Dongjoon

On Thu, Feb 29, 2024 at 11:54 Prem Sahoo <pr...@gmail.com> wrote:

> Hello Dongjoon,
> Thanks for emailing me.
> Could you please share a list of fixes  as the link provided by you is
> not working.
>
> On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org>
> wrote:
>
>> Hi,
>>
>> If you are observing correctness issues, you may hit some old (and fixed)
>> correctness issues.
>>
>> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 correctness
>> issues.
>>
>>
>> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>>
>> There are more fixes in 3.3 and 3.4 and 3.5, too.
>>
>> Please use the latest version, Apache Spark 3.5.1, because Apache Spark
>> 3.2 and 3.3 are in the End-Of-Support status of the community.
>>
>> It would be help if you can report any correctness issues with Apache
>> Spark 3.5.1.
>>
>> Thanks,
>> Dongjoon.
>>
>> On 2024/02/29 15:04:41 Prem Sahoo wrote:
>> > When Spark job shows FetchFailedException it creates few duplicate data
>> and
>> > we see few data also missing , please explain why. We have scenario when
>> > spark job complains FetchFailedException as one of the data node got
>> > rebooted middle of job running .
>> >
>> > Now due to this we have few duplicate data and few missing data . Why
>> spark
>> > is not handling this scenario correctly ? kind of we shouldn't miss any
>> > data and we shouldn't create duplicate data .
>> >
>> >
>> >
>> > I am using spark3.2.0 version.
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>
>>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Prem Sahoo <pr...@gmail.com>.
Hello Dongjoon,
Thanks for emailing me.
Could you please share a list of fixes  as the link provided by you is
not working.

On Thu, Feb 29, 2024 at 11:27 AM Dongjoon Hyun <do...@apache.org> wrote:

> Hi,
>
> If you are observing correctness issues, you may hit some old (and fixed)
> correctness issues.
>
> For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 correctness
> issues.
>
>
> https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)
>
> There are more fixes in 3.3 and 3.4 and 3.5, too.
>
> Please use the latest version, Apache Spark 3.5.1, because Apache Spark
> 3.2 and 3.3 are in the End-Of-Support status of the community.
>
> It would be help if you can report any correctness issues with Apache
> Spark 3.5.1.
>
> Thanks,
> Dongjoon.
>
> On 2024/02/29 15:04:41 Prem Sahoo wrote:
> > When Spark job shows FetchFailedException it creates few duplicate data
> and
> > we see few data also missing , please explain why. We have scenario when
> > spark job complains FetchFailedException as one of the data node got
> > rebooted middle of job running .
> >
> > Now due to this we have few duplicate data and few missing data . Why
> spark
> > is not handling this scenario correctly ? kind of we shouldn't miss any
> > data and we shouldn't create duplicate data .
> >
> >
> >
> > I am using spark3.2.0 version.
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: When Spark job shows FetchFailedException it creates few duplicate data and we see few data also missing , please explain why

Posted by Dongjoon Hyun <do...@apache.org>.
Hi,

If you are observing correctness issues, you may hit some old (and fixed) correctness issues.

For example, from Apache Spark 3.2.1 to 3.2.4, we fixed 31 correctness issues.

https://issues.apache.org/jira/issues/?filter=12345390&jql=project%20%3D%20SPARK%20AND%20fixVersion%20in%20(3.2.1%2C%203.2.2%2C%203.2.3%2C%203.2.4)%20AND%20labels%20in%20(Correctness%2C%20correctness%2C%20data-loss)

There are more fixes in 3.3 and 3.4 and 3.5, too.

Please use the latest version, Apache Spark 3.5.1, because Apache Spark 3.2 and 3.3 are in the End-Of-Support status of the community.

It would be help if you can report any correctness issues with Apache Spark 3.5.1.

Thanks,
Dongjoon.

On 2024/02/29 15:04:41 Prem Sahoo wrote:
> When Spark job shows FetchFailedException it creates few duplicate data and
> we see few data also missing , please explain why. We have scenario when
> spark job complains FetchFailedException as one of the data node got
> rebooted middle of job running .
> 
> Now due to this we have few duplicate data and few missing data . Why spark
> is not handling this scenario correctly ? kind of we shouldn't miss any
> data and we shouldn't create duplicate data .
> 
> 
> 
> I am using spark3.2.0 version.
> 

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org