You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Patrick McGloin <mc...@gmail.com> on 2016/01/28 17:51:22 UTC

Understanding Spark Task failures

I am trying to understand what will happen when Spark has an exception
during processing, especially while streaming.

If I have a small code spinet like this:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  throw new Exception("User exception...")
}

If I run this I will get output like this:

[info] processed => [List(Item1)]
[error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job
streaming job 1453999278000 ms.0
[error] java.lang.Exception: User exception...
...
[info] processed => [List(Item2)]
[error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job
streaming job 1453999279000 ms.0
[error] java.lang.Exception: User exception...

First "Item1" is processed, and it fails (of course). In the next batch
"Item2" is processed. The record "Item1" has now been lost.

If I change my code so that the exception occurs inside a task:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  rdd.map{case x => throw new Exception("User exception...") }.collect()
}

Then the map closure will be retried, but once it has failed enough times
the record is discarded and processing continues to the next record.

Is it possible to ensure that records are not discarded, even if this means
stopping the application? I have the WAL enabled.

Re: Understanding Spark Task failures

Posted by Patrick McGloin <mc...@gmail.com>.
Hi Tathagata,

Thanks for the response.  I can add in a try catch myself and handle user
exceptions, that's true, so maybe my example wasn't a very good one.  I'm
more worried about OOM exceptions and other run-time exceptions (that could
happen outside my try catch).

For example, I have this periodic "java.io.IOException: Class not found"
exception at the moment:

https://forums.databricks.com/questions/6601/javaioioexception-class-not-found-on-long-running.html

After this happens I lose data even though I have the WAL setup.  With the
WAL I can ensure that the data is safely stored when it has come into the
system from an external source, and I only ACK the external source after it
has been stored.  But it seems that there is no guarantee that the data is
successfully processed?

I assume I am right in what I am saying about losing data with the WAL
setup correctly.  The WAL works when stopping and starting the application,
etc.  But something is not handling the run time exception well.  This was
the start of my investigation into what is going wrong, so of course there
could be another reason for what I'm seeing.



On 28 January 2016 at 21:43, Tathagata Das <ta...@gmail.com>
wrote:

> That is hard to guarantee by the system, and it is upto the app developer
> to ensure that this is not . For example, if the data in a message is
> corrupted, unless the app code is robust towards handling such data, the
> system will fail every time it retries that app code.
>
> On Thu, Jan 28, 2016 at 8:51 AM, Patrick McGloin <
> mcgloin.patrick@gmail.com> wrote:
>
>> I am trying to understand what will happen when Spark has an exception
>> during processing, especially while streaming.
>>
>> If I have a small code spinet like this:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   throw new Exception("User exception...")
>> }
>>
>> If I run this I will get output like this:
>>
>> [info] processed => [List(Item1)]
>> [error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job streaming job 1453999278000 ms.0
>> [error] java.lang.Exception: User exception...
>> ...
>> [info] processed => [List(Item2)]
>> [error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job streaming job 1453999279000 ms.0
>> [error] java.lang.Exception: User exception...
>>
>> First "Item1" is processed, and it fails (of course). In the next batch
>> "Item2" is processed. The record "Item1" has now been lost.
>>
>> If I change my code so that the exception occurs inside a task:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   rdd.map{case x => throw new Exception("User exception...") }.collect()
>> }
>>
>> Then the map closure will be retried, but once it has failed enough times
>> the record is discarded and processing continues to the next record.
>>
>> Is it possible to ensure that records are not discarded, even if this
>> means stopping the application? I have the WAL enabled.
>>
>
>

Re: Understanding Spark Task failures

Posted by Tathagata Das <ta...@gmail.com>.
That is hard to guarantee by the system, and it is upto the app developer
to ensure that this is not . For example, if the data in a message is
corrupted, unless the app code is robust towards handling such data, the
system will fail every time it retries that app code.

On Thu, Jan 28, 2016 at 8:51 AM, Patrick McGloin <mc...@gmail.com>
wrote:

> I am trying to understand what will happen when Spark has an exception
> during processing, especially while streaming.
>
> If I have a small code spinet like this:
>
> myDStream.foreachRDD { (rdd: RDD[String]) =>
>   println(s"processed => [${rdd.collect().toList}]")
>   throw new Exception("User exception...")
> }
>
> If I run this I will get output like this:
>
> [info] processed => [List(Item1)]
> [error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job streaming job 1453999278000 ms.0
> [error] java.lang.Exception: User exception...
> ...
> [info] processed => [List(Item2)]
> [error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job streaming job 1453999279000 ms.0
> [error] java.lang.Exception: User exception...
>
> First "Item1" is processed, and it fails (of course). In the next batch
> "Item2" is processed. The record "Item1" has now been lost.
>
> If I change my code so that the exception occurs inside a task:
>
> myDStream.foreachRDD { (rdd: RDD[String]) =>
>   println(s"processed => [${rdd.collect().toList}]")
>   rdd.map{case x => throw new Exception("User exception...") }.collect()
> }
>
> Then the map closure will be retried, but once it has failed enough times
> the record is discarded and processing continues to the next record.
>
> Is it possible to ensure that records are not discarded, even if this
> means stopping the application? I have the WAL enabled.
>