You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/08/06 05:35:34 UTC

Re: Upgrade of Spark-Streaming application

Hi

For checkpointing and using fromOffsets  arguments- Say for the first time
when my app starts I don't have any prev state stored and I want to start
consuming from largest offset

1.  is it possible to specify that in fromOffsets api- I don't want to use
another api which returs JavaPairInputDStream but fromoffsets api
returns JavaDStream - since I want to keep further flow of my app same in
both case.


2. So to achieve first(same flow in both cases) if I  use diff api in 2
cases and when I transfer JavaPairInputDStream  to JavaDStream  using map
function , I am no longer able to typecast transferred stream to
HasOffsetRanges for getting offstes of current run- it throws class cast
exception -
when i do
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
on transformed stream -

java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges




On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger <co...@koeninger.org> wrote:

> You can't use checkpoints across code upgrades.  That may or may not
> change in the future, but for now that's a limitation of spark checkpoints
> (regardless of whether you're using Kafka).
>
> Some options:
>
> - Start up the new job on a different cluster, then kill the old job once
> it's caught up to where the new job started.  If you care about duplicate
> work, you should be doing idempotent / transactional writes anyway, which
> should take care of the overlap between the two.  If you're doing batches,
> you may need to be a little more careful about handling batch boundaries
>
> - Store the offsets somewhere other than the checkpoint, and provide them
> on startup using the fromOffsets argument to createDirectStream
>
>
>
>
>
> On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro <ni...@gmail.com>
> wrote:
>
>> Hi,
>> I've read about the recent updates about spark-streaming integration with
>> Kafka (I refer to the new approach without receivers).
>> In the new approach, metadata are persisted in checkpoint folders on HDFS
>> so that the SparkStreaming context can be recreated in case of failures.
>> This means that the streaming application will restart from the where it
>> exited and the message consuming process continues with new messages only.
>> Also, if I manually stop the streaming process and recreate the context
>> from checkpoint (using an approach similar to
>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
>> the behavior would be the same.
>>
>> Now, suppose I want to change something in the software and modify the
>> processing pipeline.
>> Can spark use the previous checkpoint to recreate the new application?
>> Will I ever be able to upgrade the software without processing all the
>> messages in Kafka again?
>>
>> Regards,
>> Nicola
>>
>
>

Re: Upgrade of Spark-Streaming application

Posted by Cody Koeninger <co...@koeninger.org>.
Do the cast to HasOffsetRanges before calling any other methods on the
direct stream.  This is covered in the documentation:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

If you want to use fromOffsets, you can also just grab the highest
available offsets from Kafka and provide them to the api.  See the
(private) method getLatestLeaderOffsets for an example.

On Wed, Aug 5, 2015 at 10:35 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
> For checkpointing and using fromOffsets  arguments- Say for the first
> time when my app starts I don't have any prev state stored and I want to
> start consuming from largest offset
>
> 1.  is it possible to specify that in fromOffsets api- I don't want to
> use another api which returs JavaPairInputDStream but fromoffsets api
> returns JavaDStream - since I want to keep further flow of my app same in
> both case.
>
>
> 2. So to achieve first(same flow in both cases) if I  use diff api in 2
> cases and when I transfer JavaPairInputDStream  to JavaDStream  using map
> function , I am no longer able to typecast transferred stream to
> HasOffsetRanges for getting offstes of current run- it throws class cast
> exception -
> when i do
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
> on transformed stream -
>
> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>
>
>
>
> On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> You can't use checkpoints across code upgrades.  That may or may not
>> change in the future, but for now that's a limitation of spark checkpoints
>> (regardless of whether you're using Kafka).
>>
>> Some options:
>>
>> - Start up the new job on a different cluster, then kill the old job once
>> it's caught up to where the new job started.  If you care about duplicate
>> work, you should be doing idempotent / transactional writes anyway, which
>> should take care of the overlap between the two.  If you're doing batches,
>> you may need to be a little more careful about handling batch boundaries
>>
>> - Store the offsets somewhere other than the checkpoint, and provide them
>> on startup using the fromOffsets argument to createDirectStream
>>
>>
>>
>>
>>
>> On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro <ni...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I've read about the recent updates about spark-streaming integration
>>> with Kafka (I refer to the new approach without receivers).
>>> In the new approach, metadata are persisted in checkpoint folders on
>>> HDFS so that the SparkStreaming context can be recreated in case of
>>> failures.
>>> This means that the streaming application will restart from the where it
>>> exited and the message consuming process continues with new messages only.
>>> Also, if I manually stop the streaming process and recreate the context
>>> from checkpoint (using an approach similar to
>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
>>> the behavior would be the same.
>>>
>>> Now, suppose I want to change something in the software and modify the
>>> processing pipeline.
>>> Can spark use the previous checkpoint to recreate the new application?
>>> Will I ever be able to upgrade the software without processing all the
>>> messages in Kafka again?
>>>
>>> Regards,
>>> Nicola
>>>
>>
>>
>

Re: Upgrade of Spark-Streaming application

Posted by Shushant Arora <sh...@gmail.com>.
Also Is in  fromoffset api last saved offset is fetched twice ? Is
fromoffset api starts from Map<TopicAndPartition m,Long l>'s
Long value or LongValue+1 ? If its from Longvalue - it will be twice - once
it was in last application's run before crash and once after crash in first
run ?

On Thu, Aug 6, 2015 at 9:05 AM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
> For checkpointing and using fromOffsets  arguments- Say for the first
> time when my app starts I don't have any prev state stored and I want to
> start consuming from largest offset
>
> 1.  is it possible to specify that in fromOffsets api- I don't want to
> use another api which returs JavaPairInputDStream but fromoffsets api
> returns JavaDStream - since I want to keep further flow of my app same in
> both case.
>
>
> 2. So to achieve first(same flow in both cases) if I  use diff api in 2
> cases and when I transfer JavaPairInputDStream  to JavaDStream  using map
> function , I am no longer able to typecast transferred stream to
> HasOffsetRanges for getting offstes of current run- it throws class cast
> exception -
> when i do
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
> on transformed stream -
>
> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>
>
>
>
> On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> You can't use checkpoints across code upgrades.  That may or may not
>> change in the future, but for now that's a limitation of spark checkpoints
>> (regardless of whether you're using Kafka).
>>
>> Some options:
>>
>> - Start up the new job on a different cluster, then kill the old job once
>> it's caught up to where the new job started.  If you care about duplicate
>> work, you should be doing idempotent / transactional writes anyway, which
>> should take care of the overlap between the two.  If you're doing batches,
>> you may need to be a little more careful about handling batch boundaries
>>
>> - Store the offsets somewhere other than the checkpoint, and provide them
>> on startup using the fromOffsets argument to createDirectStream
>>
>>
>>
>>
>>
>> On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro <ni...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I've read about the recent updates about spark-streaming integration
>>> with Kafka (I refer to the new approach without receivers).
>>> In the new approach, metadata are persisted in checkpoint folders on
>>> HDFS so that the SparkStreaming context can be recreated in case of
>>> failures.
>>> This means that the streaming application will restart from the where it
>>> exited and the message consuming process continues with new messages only.
>>> Also, if I manually stop the streaming process and recreate the context
>>> from checkpoint (using an approach similar to
>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
>>> the behavior would be the same.
>>>
>>> Now, suppose I want to change something in the software and modify the
>>> processing pipeline.
>>> Can spark use the previous checkpoint to recreate the new application?
>>> Will I ever be able to upgrade the software without processing all the
>>> messages in Kafka again?
>>>
>>> Regards,
>>> Nicola
>>>
>>
>>
>