You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jong Wook Kim <jo...@nyu.edu> on 2015/07/08 19:02:45 UTC

Streaming checkpoints and logic change

I just asked this question at the streaming webinar that just ended, but
the speakers didn't answered so throwing here:

AFAIK checkpoints are the only recommended method for running Spark
streaming without data loss. But it involves serializing the entire dstream
graph, which prohibits any logic changes. How should I update / fix logic
of a running streaming app without any data loss?

Jong Wook

Re: Streaming checkpoints and logic change

Posted by Tathagata Das <td...@databricks.com>.
Hey Jong,

No I did answer the right question. What I explained did not change the JVM
classes (that is the function is the same) but it still ensures that
computation is different (the filters get updated with time). So you can
checkpoint this and recover from it. This is ONE possible way to do
dynamically changing logic within the constraints of checkpointing. Since
there was no further idea provided on the kind of "dynamicity" in logic you
are interested in, I gave ONE possible way to do it. But I agree that i
should tied the loose ends by confirming that this should work with
checkpointing as the JVM class representing the function does not need to
change for changing the logic.

Now lets address your case of log transformer which extract field of logs
from Kafka stream. Could you provide some pseudocode on the kind of change
you would want to see? I want to learn more on what kind of dynamicity
people want. I am aware of this limitation I want to address this in
future, but for that I want to understand the requirements.

BTW, your workaround is a pretty good workaround.

TD


On Wed, Jul 8, 2015 at 10:38 AM, Jong Wook Kim <jo...@nyu.edu> wrote:

> Hi TD, you answered a wrong question. If you read the subject, mine was
> specifically about checkpointing. I'll elaborate
>
> The checkpoint, which is a serialized DStream DAG, contains all the
> metadata and *logic*, like the function passed to e.g. DStream.transform()
>
> This is serialized as a anonymous inner class at the JVM level, and will
> not tolerate the slightest logic change, because the class signature will
> change and cannot deserialize from the checkpoint which contains the
> serialized from the previous version.
>
> Logic changes are extremely common in stream processing. Say I have a log
> transformer which extracts certain fields of logs from a Kafka stream and I
> want to add another field to extract. This involves dstream logic changes,
> thus cannot be done using checkpoint, I can't even achieve at-least-once
> guarantee.
>
> My current workaround is to read current offsets by casting to
> HasOffsetRanges
> <https://github.com/apache/spark/blob/v1.4.1-rc3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala#L36-L39> and
> saving them to ZooKeeper, and give fromOffsets parameter read from
> ZooKeeper when creating a directStream. I've settled down to this approach
> for now, but I want to know how makers of Spark Streaming think about this
> drawback of checkpointing.
>
> If anyone had similar experience, suggestions will be appreciated.
>
> Jong Wook
>
>
>
> On 9 July 2015 at 02:13, Tathagata Das <td...@databricks.com> wrote:
>
>> You can use DStream.transform for some stuff. Transform takes a RDD =>
>> RDD function that allow arbitrary RDD operations to be done on RDDs of a
>> DStream. This function gets evaluated on the driver on every batch
>> interval. If you are smart about writing the function, it can do different
>> stuff at different intervals. For example, you can always use a
>> continuously updated set of filters
>>
>> dstream.transform { rdd =>
>>    val broadcastedFilters = Filters.getLatest()
>>    val newRDD  = rdd.filter { x => broadcastedFilters.get.filter(x) }
>>    newRDD
>> }
>>
>>
>> The function Filters.getLatest() will return the latest set of filters
>> that is broadcasted out, and as the transform function is processed in
>> every batch interval, it will always use the latest filters.
>>
>> HTH.
>>
>> TD
>>
>> On Wed, Jul 8, 2015 at 10:02 AM, Jong Wook Kim <jo...@nyu.edu> wrote:
>>
>>> I just asked this question at the streaming webinar that just ended, but
>>> the speakers didn't answered so throwing here:
>>>
>>> AFAIK checkpoints are the only recommended method for running Spark
>>> streaming without data loss. But it involves serializing the entire dstream
>>> graph, which prohibits any logic changes. How should I update / fix logic
>>> of a running streaming app without any data loss?
>>>
>>> Jong Wook
>>>
>>
>>
>

Re: Streaming checkpoints and logic change

Posted by Jong Wook Kim <jo...@nyu.edu>.
Hi TD, you answered a wrong question. If you read the subject, mine was
specifically about checkpointing. I'll elaborate

The checkpoint, which is a serialized DStream DAG, contains all the
metadata and *logic*, like the function passed to e.g. DStream.transform()

This is serialized as a anonymous inner class at the JVM level, and will
not tolerate the slightest logic change, because the class signature will
change and cannot deserialize from the checkpoint which contains the
serialized from the previous version.

Logic changes are extremely common in stream processing. Say I have a log
transformer which extracts certain fields of logs from a Kafka stream and I
want to add another field to extract. This involves dstream logic changes,
thus cannot be done using checkpoint, I can't even achieve at-least-once
guarantee.

My current workaround is to read current offsets by casting to
HasOffsetRanges
<https://github.com/apache/spark/blob/v1.4.1-rc3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala#L36-L39>
and
saving them to ZooKeeper, and give fromOffsets parameter read from
ZooKeeper when creating a directStream. I've settled down to this approach
for now, but I want to know how makers of Spark Streaming think about this
drawback of checkpointing.

If anyone had similar experience, suggestions will be appreciated.

Jong Wook



On 9 July 2015 at 02:13, Tathagata Das <td...@databricks.com> wrote:

> You can use DStream.transform for some stuff. Transform takes a RDD => RDD
> function that allow arbitrary RDD operations to be done on RDDs of a
> DStream. This function gets evaluated on the driver on every batch
> interval. If you are smart about writing the function, it can do different
> stuff at different intervals. For example, you can always use a
> continuously updated set of filters
>
> dstream.transform { rdd =>
>    val broadcastedFilters = Filters.getLatest()
>    val newRDD  = rdd.filter { x => broadcastedFilters.get.filter(x) }
>    newRDD
> }
>
>
> The function Filters.getLatest() will return the latest set of filters
> that is broadcasted out, and as the transform function is processed in
> every batch interval, it will always use the latest filters.
>
> HTH.
>
> TD
>
> On Wed, Jul 8, 2015 at 10:02 AM, Jong Wook Kim <jo...@nyu.edu> wrote:
>
>> I just asked this question at the streaming webinar that just ended, but
>> the speakers didn't answered so throwing here:
>>
>> AFAIK checkpoints are the only recommended method for running Spark
>> streaming without data loss. But it involves serializing the entire dstream
>> graph, which prohibits any logic changes. How should I update / fix logic
>> of a running streaming app without any data loss?
>>
>> Jong Wook
>>
>
>

Re: Streaming checkpoints and logic change

Posted by Tathagata Das <td...@databricks.com>.
You can use DStream.transform for some stuff. Transform takes a RDD => RDD
function that allow arbitrary RDD operations to be done on RDDs of a
DStream. This function gets evaluated on the driver on every batch
interval. If you are smart about writing the function, it can do different
stuff at different intervals. For example, you can always use a
continuously updated set of filters

dstream.transform { rdd =>
   val broadcastedFilters = Filters.getLatest()
   val newRDD  = rdd.filter { x => broadcastedFilters.get.filter(x) }
   newRDD
}


The function Filters.getLatest() will return the latest set of filters that
is broadcasted out, and as the transform function is processed in every
batch interval, it will always use the latest filters.

HTH.

TD

On Wed, Jul 8, 2015 at 10:02 AM, Jong Wook Kim <jo...@nyu.edu> wrote:

> I just asked this question at the streaming webinar that just ended, but
> the speakers didn't answered so throwing here:
>
> AFAIK checkpoints are the only recommended method for running Spark
> streaming without data loss. But it involves serializing the entire dstream
> graph, which prohibits any logic changes. How should I update / fix logic
> of a running streaming app without any data loss?
>
> Jong Wook
>