You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by François Garillot <fr...@typesafe.com> on 2014/11/14 11:41:10 UTC

Spark streaming fault tolerance question

Hi guys,

I have a question about how the basics of D-Streams, accumulators, failure
and speculative execution interact.

Let's say I have a streaming app that takes a stream of strings, formats
them (let's say it converts each to Unicode), and prints them (e.g. on a
news ticker). I know print() by default only prints 10 elements, but the
configuration of print() is irrelevant, let's assume we have set the max
number of printed elements to be > n, the size of the streams we're dealing
with. I use an accumulator (A1) in the formatting operation to count the
number of elements, another (A2) to count every element that goes through
the printing operation.

Let's say I run this through a stream of n elements. What's the value of
A1, A2 ?
Now I lose a node during the processing. How do A1 and A2 compare to n ?
Now I notice that one batch of the stream is lagging behind. I turn
speculative execution on. How do the values of A1 and A2 compare to n ?

Here's how my reasoning goes:

If nobody crashes or there is no speculative execution, n = A1 = A2
(trivial counting).

Format is a transformation (it produces a new RDD), print is an action (it
doesn't - in fact it produces a side-effect).

Transformations may be run multiple times (to re-build the pre-shuffle
elements if a worker dies along the way), actions are run at least once (if
you crash in the middle of printing, you restart from the persisted
"post-shuffle" collection - the shuffle being here trivial - i.e. from the
beginning, and may see multiple prints for the same element).

See
https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#fault-tolerance-properties

But I see this in Learning Spark:

Accumulators and Fault Tolerance

Spark automatically deals with failed or slow machines by re-executing
failed or slow tasks. For example, if the node running a partition of
a map operation
crashes, Spark will rerun it on another node; and even if the node does not
crash, but is simply much slower than other nodes, Spark can preemptivley
launch a “speculative” copy of the task on another node, and take its
result if that finishes. Even if no nodes fail, Spark may have to rerun a
task to rebuild a cached value that falls out of memory. The net result is
therefore that the same function may run multiple times on the same data
depending on what happens on the cluster.

How does this interact with accumulators? The end result is that for
accumulators used in actions, Spark only applies each task’s update to each
accumulator once. Thus if we want a reliable absolute value counter,
regardless of failures or multiple evaluations, we must put it inside an
action like foreach.

For accumulators used in RDD transformations instead of actions, this
guarantee does not exist. An accumulator update within a transformation can
occur more than once. One such case of a probably unintended multiple
update occurs when a cached but infrequently used RDD is first evicted from
the LRU cache and is then subsequently needed. This forces the RDD to be
recalculated from its lineage, with the unintended side-effect that calls
to update an accumulator within the transformations in that lineage are
sent again to the driver. Within transformations, accumulators should,
consequently, only be used for debugging purposes.

While future versions of Spark may change this behavior to only count the
update once, the current, 1.0.0, version does have the multiple update
behavior, so accumulators in transformations are recommended only for
debugging purposes.

So, in the case of a failure of a node, I may have A1 > n, and the same for
speculative execution.

However, the A2 accumulator update is somehow constrained to be a
'protected' side-effect, so despite the fact we may see stream elements
printed several times if the printing node dies, we always have A2 = n,
both in failure and speculative exec cases.

Am i right with this ?

-- 
François Garillot