You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Art Peel <fo...@gmail.com> on 2014/07/24 20:12:52 UTC

continuing processing when errors occur

Our system works with RDDs generated from Hadoop files. It processes each
record in a Hadoop file and for a subset of those records generates output
that is written to an external system via RDD.foreach. There are no
dependencies between the records that are processed.

If writing to the external system fails (due to a detail of what is being
written) and throws an exception, I see the following behavior:

1. Spark retries the entire partition (thus wasting time and effort),
reaches the problem record and fails again.
2. It repeats step 1 up to the default 4 tries and then gives up. As a
result, the rest of records from that Hadoop file are not processed.
3. The executor where the 4th failure occurred is marked as failed and told
to shut down and thus I lose a core for processing the remaining Hadoop
files, thus slowing down the entire process.

For this particular problem, I know how to prevent the underlying
exception, but I'd still like to get a handle on error handling for future
situations that I haven't yet encountered.

My goal is this:
Retry the problem record only (rather than starting over at the beginning
of the partition) up to N times, then give up and move on to process the
rest of the partition.

As far as I can tell, I need to supply my own retry behavior and if I want
to process records after the problem record I have to swallow exceptions
inside the foreach block.

My 2 questions are:
1. Is there anything I can do to prevent the executor from being shut down
when a failure occurs?

2. Are there ways Spark can help me get closer to my goal of retrying only
the problem record without writing my own re-try code and swallowing
exceptions?

Regards,
Art

Re: continuing processing when errors occur

Posted by Imran Rashid <im...@therashids.com>.
whoops!  just realized I was retyring the function even on success.  didn't
pay enough attention to the output from my calls.  Slightly updated
definitions:

class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
    var tries = 0
    var success = false
    while(!success && tries < nTries) {
      tries += 1
      try {
        f(a)
        success = true
      } catch {
        case scala.util.control.NonFatal(ex) =>
          println(s"failed on input $a, try $tries with $ex")
      }
    }
  }
}

implicit class Retryable[A](f: A => Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}


def tenDiv(x:Int) = println(x + " ---> " + (10 / x))


and example usage:

scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
-2 ---> -5
-1 ---> -10
failed on input 0, try 1 with java.lang.ArithmeticException: / by zero
failed on input 0, try 2 with java.lang.ArithmeticException: / by zero
failed on input 0, try 3 with java.lang.ArithmeticException: / by zero
1 ---> 10
2 ---> 5





On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid <im...@therashids.com> wrote:

> Hi Art,
>
> I have some advice that isn't spark-specific at all, so it doesn't
> *exactly* address your questions, but you might still find helpful.  I
> think using an implicit to add your retyring behavior might be useful.  I
> can think of two options:
>
> 1. enriching RDD itself, eg. to add a .retryForeach, which would have the
> desired behavior.
>
> 2. enriching Function to create a variant with retry behavior.
>
> I prefer option 2, because it could be useful outside of spark, and even
> within spark, you might realize you want to do something similar for more
> than just foreach.
>
> Here's an example.  (probably there is a more functional way to do this,
> to avoid the while loop, but my brain isn't working and that's not the
> point of this anyway)
>
> Lets say we have this function:
>
> def tenDiv(x:Int) = println(10 / x)
>
> and we try applying it to a normal old Range:
>
> scala> (-10 to 10).foreach{tenDiv}
> -1
> -1
> -1
> -1
> -1
> -2
> -2
> -3
> -5
> -10
> java.lang.ArithmeticException: / by zero
>     at .tenDiv(<console>:7)
>
>
> We can create enrich Function to add some retry behavior:
>
> class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
> {
>   def apply(a: A): Unit = {
>     var tries = 0
>     var success = false
>     while(!success && tries < nTries) {
>       tries += 1
>       try {
>         f(a)
>       } catch {
>         case scala.util.control.NonFatal(ex) =>
>           println(s"failed on try $tries with $ex")
>       }
>     }
>   }
> }
>
> implicit class Retryable[A](f: A => Unit) {
>   def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
> }
>
>
>
> We "activate" this behavior by calling .retryable(nTries) on our method.
> Like so:
>
> scala> (-2 to 2).foreach{(tenDiv _).retryable(1)}
> -5
> -10
> failed on try 1 with java.lang.ArithmeticException: / by zero
> 10
> 5
>
> scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
> -5
> -5
> -5
> -10
> -10
> -10
> failed on try 1 with java.lang.ArithmeticException: / by zero
> failed on try 2 with java.lang.ArithmeticException: / by zero
> failed on try 3 with java.lang.ArithmeticException: / by zero
> 10
> 10
> 10
> 5
> 5
> 5
>
>
> You could do the same thing on closures you pass to RDD.foreach.
>
> I should add, that I'm often very hesitant to use implicits because in can
> make it harder to follow what's going on in the code.  I think this version
> is OK, though, b/c somebody coming along later and looking at the code at
> least can see the call to "retryable" as a clue.  (I really dislike
> implicit conversions that happen without any hints in the actual code.)
> Hopefully that's enough of a hint for others to figure out what is going
> on.  Eg., intellij will know where that method came from and jump to it,
> and also if you make the name unique enough, you can probably find it with
> plain text search / c-tags.  But, its definitely worth considering for
> yourself.
>
> hope this helps,
> Imran
>
>
>
> On Thu, Jul 24, 2014 at 1:12 PM, Art Peel <fo...@gmail.com> wrote:
>
>> Our system works with RDDs generated from Hadoop files. It processes each
>> record in a Hadoop file and for a subset of those records generates output
>> that is written to an external system via RDD.foreach. There are no
>> dependencies between the records that are processed.
>>
>> If writing to the external system fails (due to a detail of what is being
>> written) and throws an exception, I see the following behavior:
>>
>> 1. Spark retries the entire partition (thus wasting time and effort),
>> reaches the problem record and fails again.
>> 2. It repeats step 1 up to the default 4 tries and then gives up. As a
>> result, the rest of records from that Hadoop file are not processed.
>> 3. The executor where the 4th failure occurred is marked as failed and
>> told to shut down and thus I lose a core for processing the remaining
>> Hadoop files, thus slowing down the entire process.
>>
>>
>> For this particular problem, I know how to prevent the underlying
>> exception, but I'd still like to get a handle on error handling for future
>> situations that I haven't yet encountered.
>>
>> My goal is this:
>> Retry the problem record only (rather than starting over at the beginning
>> of the partition) up to N times, then give up and move on to process the
>> rest of the partition.
>>
>> As far as I can tell, I need to supply my own retry behavior and if I
>> want to process records after the problem record I have to swallow
>> exceptions inside the foreach block.
>>
>> My 2 questions are:
>> 1. Is there anything I can do to prevent the executor from being shut
>> down when a failure occurs?
>>
>>
>> 2. Are there ways Spark can help me get closer to my goal of retrying
>> only the problem record without writing my own re-try code and swallowing
>> exceptions?
>>
>> Regards,
>> Art
>>
>>
>

Re: continuing processing when errors occur

Posted by Imran Rashid <im...@therashids.com>.
Hi Art,

I have some advice that isn't spark-specific at all, so it doesn't
*exactly* address your questions, but you might still find helpful.  I
think using an implicit to add your retyring behavior might be useful.  I
can think of two options:

1. enriching RDD itself, eg. to add a .retryForeach, which would have the
desired behavior.

2. enriching Function to create a variant with retry behavior.

I prefer option 2, because it could be useful outside of spark, and even
within spark, you might realize you want to do something similar for more
than just foreach.

Here's an example.  (probably there is a more functional way to do this, to
avoid the while loop, but my brain isn't working and that's not the point
of this anyway)

Lets say we have this function:

def tenDiv(x:Int) = println(10 / x)

and we try applying it to a normal old Range:

scala> (-10 to 10).foreach{tenDiv}
-1
-1
-1
-1
-1
-2
-2
-3
-5
-10
java.lang.ArithmeticException: / by zero
    at .tenDiv(<console>:7)


We can create enrich Function to add some retry behavior:

class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
    var tries = 0
    var success = false
    while(!success && tries < nTries) {
      tries += 1
      try {
        f(a)
      } catch {
        case scala.util.control.NonFatal(ex) =>
          println(s"failed on try $tries with $ex")
      }
    }
  }
}

implicit class Retryable[A](f: A => Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}



We "activate" this behavior by calling .retryable(nTries) on our method.
Like so:

scala> (-2 to 2).foreach{(tenDiv _).retryable(1)}
-5
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
10
5

scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
-5
-5
-5
-10
-10
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
failed on try 2 with java.lang.ArithmeticException: / by zero
failed on try 3 with java.lang.ArithmeticException: / by zero
10
10
10
5
5
5


You could do the same thing on closures you pass to RDD.foreach.

I should add, that I'm often very hesitant to use implicits because in can
make it harder to follow what's going on in the code.  I think this version
is OK, though, b/c somebody coming along later and looking at the code at
least can see the call to "retryable" as a clue.  (I really dislike
implicit conversions that happen without any hints in the actual code.)
Hopefully that's enough of a hint for others to figure out what is going
on.  Eg., intellij will know where that method came from and jump to it,
and also if you make the name unique enough, you can probably find it with
plain text search / c-tags.  But, its definitely worth considering for
yourself.

hope this helps,
Imran



On Thu, Jul 24, 2014 at 1:12 PM, Art Peel <fo...@gmail.com> wrote:

> Our system works with RDDs generated from Hadoop files. It processes each
> record in a Hadoop file and for a subset of those records generates output
> that is written to an external system via RDD.foreach. There are no
> dependencies between the records that are processed.
>
> If writing to the external system fails (due to a detail of what is being
> written) and throws an exception, I see the following behavior:
>
> 1. Spark retries the entire partition (thus wasting time and effort),
> reaches the problem record and fails again.
> 2. It repeats step 1 up to the default 4 tries and then gives up. As a
> result, the rest of records from that Hadoop file are not processed.
> 3. The executor where the 4th failure occurred is marked as failed and
> told to shut down and thus I lose a core for processing the remaining
> Hadoop files, thus slowing down the entire process.
>
>
> For this particular problem, I know how to prevent the underlying
> exception, but I'd still like to get a handle on error handling for future
> situations that I haven't yet encountered.
>
> My goal is this:
> Retry the problem record only (rather than starting over at the beginning
> of the partition) up to N times, then give up and move on to process the
> rest of the partition.
>
> As far as I can tell, I need to supply my own retry behavior and if I want
> to process records after the problem record I have to swallow exceptions
> inside the foreach block.
>
> My 2 questions are:
> 1. Is there anything I can do to prevent the executor from being shut down
> when a failure occurs?
>
>
> 2. Are there ways Spark can help me get closer to my goal of retrying only
> the problem record without writing my own re-try code and swallowing
> exceptions?
>
> Regards,
> Art
>
>