You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by András Kolbert <ko...@gmail.com> on 2020/09/03 09:41:05 UTC

Spark Streaming Checkpointing

Hi All,

I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
Streaming) running just fine.

I create a context in the following way:

ssc = StreamingContext(sc, 60) opts =
{"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest",
     "group.id": run_type}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120)

lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
ssc.checkpoint(checkpoint)

The streaming app at a high level does this:

   - processes incoming batch
   - unions to the dataframe from the previous batch and aggregates them

Currently, I use checkpointing explicitly (df = df.checkpoint()) to
optimise the lineage. Although this is quite an expensive exercise and was
wondering if there is a better way to do this.

I tried to disable this explicit checkpointing, as I have a periodical
checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will be
kept to that checkpointed RDD. Although in reality that is not the case and
processing keeps increasing over time.

Am I doing something inherently wrong? Is there a better way of doing this?

Thanks
Andras

Re: Spark Streaming Checkpointing

Posted by András Kolbert <ko...@gmail.com>.
Hi Gábor,


Thanks for your reply on this!

Internally that's used at the company I work at - it hasn't been changed
mainly due to the compatibility of the current deployed java applications.

Hence I am attempting to make the most of this version :)

András



On Fri, 4 Sep 2020, 14:09 Gabor Somogyi, <ga...@gmail.com> wrote:

> Hi Andras,
>
> A general suggestion is to use Structured Streaming instead of DStreams
> because it provides several things out of the box (stateful streaming,
> etc...).
> Kafka 0.8 is super old and deprecated (no security...). Do you have a
> specific reason to use that?
>
> BR,
> G
>
>
> On Thu, Sep 3, 2020 at 11:41 AM András Kolbert <ko...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
>> Streaming) running just fine.
>>
>> I create a context in the following way:
>>
>> ssc = StreamingContext(sc, 60) opts = {"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest",         "group.id": run_type}
>> kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
>> kvs.checkpoint(120)
>>
>> lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
>> ssc.checkpoint(checkpoint)
>>
>> The streaming app at a high level does this:
>>
>>    - processes incoming batch
>>    - unions to the dataframe from the previous batch and aggregates them
>>
>> Currently, I use checkpointing explicitly (df = df.checkpoint()) to
>> optimise the lineage. Although this is quite an expensive exercise and was
>> wondering if there is a better way to do this.
>>
>> I tried to disable this explicit checkpointing, as I have a periodical
>> checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will
>> be kept to that checkpointed RDD. Although in reality that is not the case
>> and processing keeps increasing over time.
>>
>> Am I doing something inherently wrong? Is there a better way of doing
>> this?
>>
>> Thanks
>> Andras
>>
>

Re: Spark Streaming Checkpointing

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Andras,

A general suggestion is to use Structured Streaming instead of DStreams
because it provides several things out of the box (stateful streaming,
etc...).
Kafka 0.8 is super old and deprecated (no security...). Do you have a
specific reason to use that?

BR,
G


On Thu, Sep 3, 2020 at 11:41 AM András Kolbert <ko...@gmail.com>
wrote:

> Hi All,
>
> I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
> Streaming) running just fine.
>
> I create a context in the following way:
>
> ssc = StreamingContext(sc, 60) opts = {"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest",         "group.id": run_type}
> kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
> kvs.checkpoint(120)
>
> lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
> ssc.checkpoint(checkpoint)
>
> The streaming app at a high level does this:
>
>    - processes incoming batch
>    - unions to the dataframe from the previous batch and aggregates them
>
> Currently, I use checkpointing explicitly (df = df.checkpoint()) to
> optimise the lineage. Although this is quite an expensive exercise and was
> wondering if there is a better way to do this.
>
> I tried to disable this explicit checkpointing, as I have a periodical
> checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will
> be kept to that checkpointed RDD. Although in reality that is not the case
> and processing keeps increasing over time.
>
> Am I doing something inherently wrong? Is there a better way of doing this?
>
> Thanks
> Andras
>