You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2016/10/24 07:43:33 UTC

spark streaming with kinesis

Does spark streaming consumer for kinesis uses Kinesis Client Library  and
mandates to checkpoint the sequence number of shards in dynamo db.

Will it lead to dataloss if consumed datarecords are not yet processed and
kinesis checkpointed the consumed sequenece numbers in dynamo db and spark
worker crashes - then spark launched the worker on another node but start
consuming from dynamo db's checkpointed sequence number which is ahead of
processed sequenece number .

is there a way to checkpoint the sequenece numbers ourselves in Kinesis as
it is in Kafka low level consumer ?

Thanks

Re: spark streaming with kinesis

Posted by Takeshi Yamamuro <li...@gmail.com>.
I'm not familiar with the kafka implementation though, a kinesis receiver
runs in a thread of executors.
You can set any value in the interval, but frequent checkpoints cause
excess loads in dynamodb.
See:
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html#kinesis-checkpointing

// maropu

On Mon, Nov 7, 2016 at 1:36 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
> By receicer I meant spark streaming receiver architecture- means worker
> nodes are different than receiver nodes. There is no direct consumer/low
> level consumer like of  Kafka in kinesis spark streaming?
>
> Is there any limitation on interval checkpoint - minimum of 1second in
> spark streaming with kinesis. But as such there is no limit on checkpoint
> interval in KCL side ?
>
> Thanks
>
> On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro <li...@gmail.com>
> wrote:
>
>> I'm not exactly sure about the receiver you pointed though,
>> if you point the "KinesisReceiver" implementation, yes.
>>
>> Also, we currently cannot disable the interval checkpoints.
>>
>> On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Thanks!
>>>
>>> Is kinesis streams are receiver based only? Is there non receiver based
>>> consumer for Kinesis ?
>>>
>>> And Instead of having fixed checkpoint interval,Can I disable auto
>>> checkpoint and say  when my worker has processed the data after last record
>>> of mapPartition now checkpoint the sequence no using some api.
>>>
>>>
>>>
>>> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro <linguin.m.s@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> The only thing you can do for Kinesis checkpoints is tune the interval
>>>> of them.
>>>> https://github.com/apache/spark/blob/master/external/kinesis
>>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>>> isUtils.scala#L68
>>>>
>>>> Whether the dataloss occurs or not depends on the storage level you set;
>>>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue
>>>> processing
>>>> in case of the dataloss because the stream data Spark receives are
>>>> replicated across executors.
>>>> However,  all the executors that have the replicated data crash,
>>>> IIUC the dataloss occurs.
>>>>
>>>> // maropu
>>>>
>>>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>>>
>>>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>>>> spark worker crashes - then spark launched the worker on another node but
>>>>> start consuming from dynamo db's checkpointed sequence number which is
>>>>> ahead of processed sequenece number .
>>>>>
>>>>> is there a way to checkpoint the sequenece numbers ourselves in
>>>>> Kinesis as it is in Kafka low level consumer ?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro

Re: spark streaming with kinesis

Posted by Shushant Arora <sh...@gmail.com>.
Hi

By receicer I meant spark streaming receiver architecture- means worker
nodes are different than receiver nodes. There is no direct consumer/low
level consumer like of  Kafka in kinesis spark streaming?

Is there any limitation on interval checkpoint - minimum of 1second in
spark streaming with kinesis. But as such there is no limit on checkpoint
interval in KCL side ?

Thanks

On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro <li...@gmail.com>
wrote:

> I'm not exactly sure about the receiver you pointed though,
> if you point the "KinesisReceiver" implementation, yes.
>
> Also, we currently cannot disable the interval checkpoints.
>
> On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <
> shushantarora09@gmail.com> wrote:
>
>> Thanks!
>>
>> Is kinesis streams are receiver based only? Is there non receiver based
>> consumer for Kinesis ?
>>
>> And Instead of having fixed checkpoint interval,Can I disable auto
>> checkpoint and say  when my worker has processed the data after last record
>> of mapPartition now checkpoint the sequence no using some api.
>>
>>
>>
>> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro <li...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> The only thing you can do for Kinesis checkpoints is tune the interval
>>> of them.
>>> https://github.com/apache/spark/blob/master/external/kinesis
>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>> isUtils.scala#L68
>>>
>>> Whether the dataloss occurs or not depends on the storage level you set;
>>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
>>> in case of the dataloss because the stream data Spark receives are
>>> replicated across executors.
>>> However,  all the executors that have the replicated data crash,
>>> IIUC the dataloss occurs.
>>>
>>> // maropu
>>>
>>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>>
>>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>>> spark worker crashes - then spark launched the worker on another node but
>>>> start consuming from dynamo db's checkpointed sequence number which is
>>>> ahead of processed sequenece number .
>>>>
>>>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>>>> as it is in Kafka low level consumer ?
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: spark streaming with kinesis

Posted by Takeshi Yamamuro <li...@gmail.com>.
I'm not exactly sure about the receiver you pointed though,
if you point the "KinesisReceiver" implementation, yes.

Also, we currently cannot disable the interval checkpoints.

On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <sh...@gmail.com>
wrote:

> Thanks!
>
> Is kinesis streams are receiver based only? Is there non receiver based
> consumer for Kinesis ?
>
> And Instead of having fixed checkpoint interval,Can I disable auto
> checkpoint and say  when my worker has processed the data after last record
> of mapPartition now checkpoint the sequence no using some api.
>
>
>
> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro <li...@gmail.com>
> wrote:
>
>> Hi,
>>
>> The only thing you can do for Kinesis checkpoints is tune the interval of
>> them.
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/
>> KinesisUtils.scala#L68
>>
>> Whether the dataloss occurs or not depends on the storage level you set;
>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
>> in case of the dataloss because the stream data Spark receives are
>> replicated across executors.
>> However,  all the executors that have the replicated data crash,
>> IIUC the dataloss occurs.
>>
>> // maropu
>>
>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>
>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>> spark worker crashes - then spark launched the worker on another node but
>>> start consuming from dynamo db's checkpointed sequence number which is
>>> ahead of processed sequenece number .
>>>
>>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>>> as it is in Kafka low level consumer ?
>>>
>>> Thanks
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro

Re: spark streaming with kinesis

Posted by Shushant Arora <sh...@gmail.com>.
Thanks!

Is kinesis streams are receiver based only? Is there non receiver based
consumer for Kinesis ?

And Instead of having fixed checkpoint interval,Can I disable auto
checkpoint and say  when my worker has processed the data after last record
of mapPartition now checkpoint the sequence no using some api.



On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro <li...@gmail.com>
wrote:

> Hi,
>
> The only thing you can do for Kinesis checkpoints is tune the interval of
> them.
> https://github.com/apache/spark/blob/master/external/kinesis
> -asl/src/main/scala/org/apache/spark/streaming/kinesis
> /KinesisUtils.scala#L68
>
> Whether the dataloss occurs or not depends on the storage level you set;
> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
> in case of the dataloss because the stream data Spark receives are
> replicated across executors.
> However,  all the executors that have the replicated data crash,
> IIUC the dataloss occurs.
>
> // maropu
>
> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>
>> Will it lead to dataloss if consumed datarecords are not yet processed
>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>> spark worker crashes - then spark launched the worker on another node but
>> start consuming from dynamo db's checkpointed sequence number which is
>> ahead of processed sequenece number .
>>
>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>> as it is in Kafka low level consumer ?
>>
>> Thanks
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: spark streaming with kinesis

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

The only thing you can do for Kinesis checkpoints is tune the interval of
them.
https://github.com/apache/spark/blob/master/external/
kinesis-asl/src/main/scala/org/apache/spark/streaming/
kinesis/KinesisUtils.scala#L68

Whether the dataloss occurs or not depends on the storage level you set;
if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
in case of the dataloss because the stream data Spark receives are
replicated across executors.
However,  all the executors that have the replicated data crash,
IIUC the dataloss occurs.

// maropu

On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Does spark streaming consumer for kinesis uses Kinesis Client Library
>  and mandates to checkpoint the sequence number of shards in dynamo db.
>
> Will it lead to dataloss if consumed datarecords are not yet processed and
> kinesis checkpointed the consumed sequenece numbers in dynamo db and spark
> worker crashes - then spark launched the worker on another node but start
> consuming from dynamo db's checkpointed sequence number which is ahead of
> processed sequenece number .
>
> is there a way to checkpoint the sequenece numbers ourselves in Kinesis as
> it is in Kafka low level consumer ?
>
> Thanks
>
>


-- 
---
Takeshi Yamamuro