You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Chad Greenberg <ev...@hotmail.com> on 2017/02/06 22:57:44 UTC

Questions about checkpointing and Kinesis

Starting on an integration project between a Kinesis stream and Samza, despite have no background in either, but I am familiar with most other messaging/queuing systems.


Decided to keep all state management within Samza instead of using Kinesis' client library. My plan was to use the default KafkaCheckpointManagerFactory on an timed interval basis, but I have a few questions.


What exactly is being checkpointed? What value can I retrieve to use as an offset for my Kinesis stream? Or is this something I need to keep track of in a store? If so, what is the point of checkpointing? Can I use RocksDb to save the Kinesis offset at every document (efficiently that is)?


Related to Kinesis and not quite Samza, it does not have a listener/push framework, but it requires constant polling (unless I missed something). First of all, I was going to have a partition for each Kinesis shard partition. But the next question is, should I simply have a while(true) polling method inside my consumer(BlockingEnvelopeMap)? Seems inefficient, even with a timeout. How can I get new data to instantiate a new consumer? My consumer will put a new document to my task.


Cheers.




Re: Questions about checkpointing and Kinesis

Posted by Renato Marroquín Mogrovejo <re...@gmail.com>.
That sounds great Chad! Please let me know how you progress on.
I will try to update the references to the lastest Samza and try the code I
have out, and then report back.


Best,


Renato M.

2017-02-11 1:35 GMT+01:00 Chad Greenberg <ev...@hotmail.com>:

> Thanks for the link Renato. I was able to incorporate some changes thanks
> to Jagadish and Boris, but I will test out your code. I will update the
> dependencies and take it from there.
>
>
> ________________________________
> From: Renato Marroquín Mogrovejo <re...@gmail.com>
> Sent: Wednesday, February 8, 2017 5:10:12 PM
> To: dev@samza.apache.org
> Subject: Re: Questions about checkpointing and Kinesis
>
> Hi all,
>
> First of all I'd like to say sorry I have gone missing for such a long time
> but student life hasn't been easy last year and I have neglected the work
> done on this topic.
> Anyways, on [1] is the work done during GSoC (I did share it with people
> but publicly)
> @Chad I'd be more than happy (and thankful!) to work with you to bring it
> up to date and finally get it into Samza trunk, would you be up for that?
> :)
>
>
> Best,
>
> Renato M.
>
> [1] https://github.com/renato2099/SamzaKinesis
>
> 2017-02-08 2:26 GMT+01:00 Boris S <bo...@gmail.com>:
>
> > If you are implementing your own consumer, on start Samza will call
> > register method of the consumer and will pass the offsets.
> > If you are using CheckpointListener (like Jagadish mentioned) you will
> get
> > a callback on each checkpoint (so you can ignore the register call),
> > otherwise you can use checkpoint passed to you in the register call.
> >
> > On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg <
> evil_goodness@hotmail.com>
> > wrote:
> >
> > > Thanks Jagadish for the reply. A few comments.
> > >
> > >
> > > I was under the impression that using the KCL would mean recording the
> > > offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL
> so
> > > that I can save state in Samza.
> > >
> > >
> > > I assumed that the checkpoint values were based on the values of the
> > put()
> > > method, but I did not see any explicit documentation to that effect (I
> > have
> > > been reading a ton, so I could have missed something). What I do not
> see
> > > however, is how to retrieve those values upon start-up. Is this the
> role
> > of
> > > the SystemAdmin? Is there any documentation about the use of
> SystemAdmin?
> > >
> > > ________________________________
> > > From: Jagadish Venkatraman <ja...@gmail.com>
> > > Sent: Tuesday, February 7, 2017 1:48:00 AM
> > > To: dev@samza.apache.org
> > > Subject: Re: Questions about checkpointing and Kinesis
> > >
> > > Great to hear this development on the kinesis consumer!
> > >
> > > Let me answer some of your questions here.
> > >
> > > *1. "Kinesis does not have a listener/push framework (unless I missed
> > > something)"*
> > >
> > >  Let me point out that Kinesis has both a push and a pull based model.
> > You
> > > can choose to implement either for your use-case.
> > >
> > >    - *Pull:* The pull based model supports obtaining an *ShardIterator*
> > for
> > >    a shard and iterating on it. Please refer the docs for the
> > >    *ShardIteratorRequest* here
> > >    <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/
> > > amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
> > >    It's important that you are aware of re-sharding - The
> > >    record.getNextShardIterator() can return null if there was a merge
> / a
> > >    split for the shard. (You can trivially handle re-shards by
> > re-starting)
> > >    - *Push:* The push based model directly uses KCL (The Kinesis Client
> > >    Library) to subscribe to events. KCL will handle sharding,
> > re-balancing,
> > >    checkpointing internally.
> > >
> > > Your consumer can implement the *CheckpointListener
> > > <https://github.com/apache/samza/blob/master/samza-api/
> > > src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
> > > interface to get notified when Samza performs a checkpoint. You can
> save
> > > the offsets of the shards to kinesis by invoking
> > > IRecordCheckpointer.checkpoint
> > > <https://github.com/awslabs/amazon-kinesis-client/blob/
> > > master/src/main/java/com/amazonaws/services/kinesis/
> > > clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
> > > APIs.
> > >
> > >
> > > *2. "My plan was to use the default KafkaCheckpointManagerFactory on an
> > > timed interval basis"*
> > >
> > > The checkpoint manager merely provides persistence for the checkpoints.
> > (In
> > > that sense, it's actually a checkpoint writer). You probably don't want
> > to
> > > implement a custom checkpoint manager.
> > >
> > >
> > > *3. "What exactly is being checkpointed? What value can I retrieve to
> use
> > > as an offset for my Kinesis stream? Or is this something I need to keep
> > > track of in a store? If so, what is the point of checkpointing? Can I
> use
> > > RocksDb to save the Kinesis offset at every document (efficiently that
> > > is)?"*
> > >
> > > *- *Samza checkpoints [ssp, offset] pairs for your tasks.
> > > - Kinesis has an implicit notion of sequence numbers for every shard
> in a
> > > stream. You can use that as offsets.
> > > - You don't want to record offsets in a separate store. If you want
> Samza
> > > to manage offsets, Samza will use Kafka internally. If you want Kinesis
> > to
> > > manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.
> > >
> > >
> > >
> > >
> > > On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <
> > evil_goodness@hotmail.com>
> > > wrote:
> > >
> > > > Starting on an integration project between a Kinesis stream and
> Samza,
> > > > despite have no background in either, but I am familiar with most
> other
> > > > messaging/queuing systems.
> > > >
> > > >
> > > > Decided to keep all state management within Samza instead of using
> > > > Kinesis' client library. My plan was to use the default
> > > > KafkaCheckpointManagerFactory on an timed interval basis, but I have
> a
> > > few
> > > > questions.
> > > >
> > > >
> > > > What exactly is being checkpointed? What value can I retrieve to use
> as
> > > an
> > > > offset for my Kinesis stream? Or is this something I need to keep
> track
> > > of
> > > > in a store? If so, what is the point of checkpointing? Can I use
> > RocksDb
> > > to
> > > > save the Kinesis offset at every document (efficiently that is)?
> > > >
> > > >
> > > > Related to Kinesis and not quite Samza, it does not have a
> > listener/push
> > > > framework, but it requires constant polling (unless I missed
> > something).
> > > > First of all, I was going to have a partition for each Kinesis shard
> > > > partition. But the next question is, should I simply have a
> while(true)
> > > > polling method inside my consumer(BlockingEnvelopeMap)? Seems
> > > inefficient,
> > > > even with a timeout. How can I get new data to instantiate a new
> > > consumer?
> > > > My consumer will put a new document to my task.
> > > >
> > > >
> > > > Cheers.
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> > >
> >
>

Re: Questions about checkpointing and Kinesis

Posted by Chad Greenberg <ev...@hotmail.com>.
Thanks for the link Renato. I was able to incorporate some changes thanks to Jagadish and Boris, but I will test out your code. I will update the dependencies and take it from there.


________________________________
From: Renato Marroquín Mogrovejo <re...@gmail.com>
Sent: Wednesday, February 8, 2017 5:10:12 PM
To: dev@samza.apache.org
Subject: Re: Questions about checkpointing and Kinesis

Hi all,

First of all I'd like to say sorry I have gone missing for such a long time
but student life hasn't been easy last year and I have neglected the work
done on this topic.
Anyways, on [1] is the work done during GSoC (I did share it with people
but publicly)
@Chad I'd be more than happy (and thankful!) to work with you to bring it
up to date and finally get it into Samza trunk, would you be up for that? :)


Best,

Renato M.

[1] https://github.com/renato2099/SamzaKinesis

2017-02-08 2:26 GMT+01:00 Boris S <bo...@gmail.com>:

> If you are implementing your own consumer, on start Samza will call
> register method of the consumer and will pass the offsets.
> If you are using CheckpointListener (like Jagadish mentioned) you will get
> a callback on each checkpoint (so you can ignore the register call),
> otherwise you can use checkpoint passed to you in the register call.
>
> On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg <ev...@hotmail.com>
> wrote:
>
> > Thanks Jagadish for the reply. A few comments.
> >
> >
> > I was under the impression that using the KCL would mean recording the
> > offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so
> > that I can save state in Samza.
> >
> >
> > I assumed that the checkpoint values were based on the values of the
> put()
> > method, but I did not see any explicit documentation to that effect (I
> have
> > been reading a ton, so I could have missed something). What I do not see
> > however, is how to retrieve those values upon start-up. Is this the role
> of
> > the SystemAdmin? Is there any documentation about the use of SystemAdmin?
> >
> > ________________________________
> > From: Jagadish Venkatraman <ja...@gmail.com>
> > Sent: Tuesday, February 7, 2017 1:48:00 AM
> > To: dev@samza.apache.org
> > Subject: Re: Questions about checkpointing and Kinesis
> >
> > Great to hear this development on the kinesis consumer!
> >
> > Let me answer some of your questions here.
> >
> > *1. "Kinesis does not have a listener/push framework (unless I missed
> > something)"*
> >
> >  Let me point out that Kinesis has both a push and a pull based model.
> You
> > can choose to implement either for your use-case.
> >
> >    - *Pull:* The pull based model supports obtaining an *ShardIterator*
> for
> >    a shard and iterating on it. Please refer the docs for the
> >    *ShardIteratorRequest* here
> >    <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/
> > amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
> >    It's important that you are aware of re-sharding - The
> >    record.getNextShardIterator() can return null if there was a merge / a
> >    split for the shard. (You can trivially handle re-shards by
> re-starting)
> >    - *Push:* The push based model directly uses KCL (The Kinesis Client
> >    Library) to subscribe to events. KCL will handle sharding,
> re-balancing,
> >    checkpointing internally.
> >
> > Your consumer can implement the *CheckpointListener
> > <https://github.com/apache/samza/blob/master/samza-api/
> > src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
> > interface to get notified when Samza performs a checkpoint. You can save
> > the offsets of the shards to kinesis by invoking
> > IRecordCheckpointer.checkpoint
> > <https://github.com/awslabs/amazon-kinesis-client/blob/
> > master/src/main/java/com/amazonaws/services/kinesis/
> > clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
> > APIs.
> >
> >
> > *2. "My plan was to use the default KafkaCheckpointManagerFactory on an
> > timed interval basis"*
> >
> > The checkpoint manager merely provides persistence for the checkpoints.
> (In
> > that sense, it's actually a checkpoint writer). You probably don't want
> to
> > implement a custom checkpoint manager.
> >
> >
> > *3. "What exactly is being checkpointed? What value can I retrieve to use
> > as an offset for my Kinesis stream? Or is this something I need to keep
> > track of in a store? If so, what is the point of checkpointing? Can I use
> > RocksDb to save the Kinesis offset at every document (efficiently that
> > is)?"*
> >
> > *- *Samza checkpoints [ssp, offset] pairs for your tasks.
> > - Kinesis has an implicit notion of sequence numbers for every shard in a
> > stream. You can use that as offsets.
> > - You don't want to record offsets in a separate store. If you want Samza
> > to manage offsets, Samza will use Kafka internally. If you want Kinesis
> to
> > manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.
> >
> >
> >
> >
> > On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <
> evil_goodness@hotmail.com>
> > wrote:
> >
> > > Starting on an integration project between a Kinesis stream and Samza,
> > > despite have no background in either, but I am familiar with most other
> > > messaging/queuing systems.
> > >
> > >
> > > Decided to keep all state management within Samza instead of using
> > > Kinesis' client library. My plan was to use the default
> > > KafkaCheckpointManagerFactory on an timed interval basis, but I have a
> > few
> > > questions.
> > >
> > >
> > > What exactly is being checkpointed? What value can I retrieve to use as
> > an
> > > offset for my Kinesis stream? Or is this something I need to keep track
> > of
> > > in a store? If so, what is the point of checkpointing? Can I use
> RocksDb
> > to
> > > save the Kinesis offset at every document (efficiently that is)?
> > >
> > >
> > > Related to Kinesis and not quite Samza, it does not have a
> listener/push
> > > framework, but it requires constant polling (unless I missed
> something).
> > > First of all, I was going to have a partition for each Kinesis shard
> > > partition. But the next question is, should I simply have a while(true)
> > > polling method inside my consumer(BlockingEnvelopeMap)? Seems
> > inefficient,
> > > even with a timeout. How can I get new data to instantiate a new
> > consumer?
> > > My consumer will put a new document to my task.
> > >
> > >
> > > Cheers.
> > >
> > >
> > >
> > >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >
>

Re: Questions about checkpointing and Kinesis

Posted by Renato Marroquín Mogrovejo <re...@gmail.com>.
Hi all,

First of all I'd like to say sorry I have gone missing for such a long time
but student life hasn't been easy last year and I have neglected the work
done on this topic.
Anyways, on [1] is the work done during GSoC (I did share it with people
but publicly)
@Chad I'd be more than happy (and thankful!) to work with you to bring it
up to date and finally get it into Samza trunk, would you be up for that? :)


Best,

Renato M.

[1] https://github.com/renato2099/SamzaKinesis

2017-02-08 2:26 GMT+01:00 Boris S <bo...@gmail.com>:

> If you are implementing your own consumer, on start Samza will call
> register method of the consumer and will pass the offsets.
> If you are using CheckpointListener (like Jagadish mentioned) you will get
> a callback on each checkpoint (so you can ignore the register call),
> otherwise you can use checkpoint passed to you in the register call.
>
> On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg <ev...@hotmail.com>
> wrote:
>
> > Thanks Jagadish for the reply. A few comments.
> >
> >
> > I was under the impression that using the KCL would mean recording the
> > offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so
> > that I can save state in Samza.
> >
> >
> > I assumed that the checkpoint values were based on the values of the
> put()
> > method, but I did not see any explicit documentation to that effect (I
> have
> > been reading a ton, so I could have missed something). What I do not see
> > however, is how to retrieve those values upon start-up. Is this the role
> of
> > the SystemAdmin? Is there any documentation about the use of SystemAdmin?
> >
> > ________________________________
> > From: Jagadish Venkatraman <ja...@gmail.com>
> > Sent: Tuesday, February 7, 2017 1:48:00 AM
> > To: dev@samza.apache.org
> > Subject: Re: Questions about checkpointing and Kinesis
> >
> > Great to hear this development on the kinesis consumer!
> >
> > Let me answer some of your questions here.
> >
> > *1. "Kinesis does not have a listener/push framework (unless I missed
> > something)"*
> >
> >  Let me point out that Kinesis has both a push and a pull based model.
> You
> > can choose to implement either for your use-case.
> >
> >    - *Pull:* The pull based model supports obtaining an *ShardIterator*
> for
> >    a shard and iterating on it. Please refer the docs for the
> >    *ShardIteratorRequest* here
> >    <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/
> > amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
> >    It's important that you are aware of re-sharding - The
> >    record.getNextShardIterator() can return null if there was a merge / a
> >    split for the shard. (You can trivially handle re-shards by
> re-starting)
> >    - *Push:* The push based model directly uses KCL (The Kinesis Client
> >    Library) to subscribe to events. KCL will handle sharding,
> re-balancing,
> >    checkpointing internally.
> >
> > Your consumer can implement the *CheckpointListener
> > <https://github.com/apache/samza/blob/master/samza-api/
> > src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
> > interface to get notified when Samza performs a checkpoint. You can save
> > the offsets of the shards to kinesis by invoking
> > IRecordCheckpointer.checkpoint
> > <https://github.com/awslabs/amazon-kinesis-client/blob/
> > master/src/main/java/com/amazonaws/services/kinesis/
> > clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
> > APIs.
> >
> >
> > *2. "My plan was to use the default KafkaCheckpointManagerFactory on an
> > timed interval basis"*
> >
> > The checkpoint manager merely provides persistence for the checkpoints.
> (In
> > that sense, it's actually a checkpoint writer). You probably don't want
> to
> > implement a custom checkpoint manager.
> >
> >
> > *3. "What exactly is being checkpointed? What value can I retrieve to use
> > as an offset for my Kinesis stream? Or is this something I need to keep
> > track of in a store? If so, what is the point of checkpointing? Can I use
> > RocksDb to save the Kinesis offset at every document (efficiently that
> > is)?"*
> >
> > *- *Samza checkpoints [ssp, offset] pairs for your tasks.
> > - Kinesis has an implicit notion of sequence numbers for every shard in a
> > stream. You can use that as offsets.
> > - You don't want to record offsets in a separate store. If you want Samza
> > to manage offsets, Samza will use Kafka internally. If you want Kinesis
> to
> > manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.
> >
> >
> >
> >
> > On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <
> evil_goodness@hotmail.com>
> > wrote:
> >
> > > Starting on an integration project between a Kinesis stream and Samza,
> > > despite have no background in either, but I am familiar with most other
> > > messaging/queuing systems.
> > >
> > >
> > > Decided to keep all state management within Samza instead of using
> > > Kinesis' client library. My plan was to use the default
> > > KafkaCheckpointManagerFactory on an timed interval basis, but I have a
> > few
> > > questions.
> > >
> > >
> > > What exactly is being checkpointed? What value can I retrieve to use as
> > an
> > > offset for my Kinesis stream? Or is this something I need to keep track
> > of
> > > in a store? If so, what is the point of checkpointing? Can I use
> RocksDb
> > to
> > > save the Kinesis offset at every document (efficiently that is)?
> > >
> > >
> > > Related to Kinesis and not quite Samza, it does not have a
> listener/push
> > > framework, but it requires constant polling (unless I missed
> something).
> > > First of all, I was going to have a partition for each Kinesis shard
> > > partition. But the next question is, should I simply have a while(true)
> > > polling method inside my consumer(BlockingEnvelopeMap)? Seems
> > inefficient,
> > > even with a timeout. How can I get new data to instantiate a new
> > consumer?
> > > My consumer will put a new document to my task.
> > >
> > >
> > > Cheers.
> > >
> > >
> > >
> > >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >
>

Re: Questions about checkpointing and Kinesis

Posted by Boris S <bo...@gmail.com>.
If you are implementing your own consumer, on start Samza will call
register method of the consumer and will pass the offsets.
If you are using CheckpointListener (like Jagadish mentioned) you will get
a callback on each checkpoint (so you can ignore the register call),
otherwise you can use checkpoint passed to you in the register call.

On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg <ev...@hotmail.com>
wrote:

> Thanks Jagadish for the reply. A few comments.
>
>
> I was under the impression that using the KCL would mean recording the
> offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so
> that I can save state in Samza.
>
>
> I assumed that the checkpoint values were based on the values of the put()
> method, but I did not see any explicit documentation to that effect (I have
> been reading a ton, so I could have missed something). What I do not see
> however, is how to retrieve those values upon start-up. Is this the role of
> the SystemAdmin? Is there any documentation about the use of SystemAdmin?
>
> ________________________________
> From: Jagadish Venkatraman <ja...@gmail.com>
> Sent: Tuesday, February 7, 2017 1:48:00 AM
> To: dev@samza.apache.org
> Subject: Re: Questions about checkpointing and Kinesis
>
> Great to hear this development on the kinesis consumer!
>
> Let me answer some of your questions here.
>
> *1. "Kinesis does not have a listener/push framework (unless I missed
> something)"*
>
>  Let me point out that Kinesis has both a push and a pull based model. You
> can choose to implement either for your use-case.
>
>    - *Pull:* The pull based model supports obtaining an *ShardIterator* for
>    a shard and iterating on it. Please refer the docs for the
>    *ShardIteratorRequest* here
>    <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/
> amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
>    It's important that you are aware of re-sharding - The
>    record.getNextShardIterator() can return null if there was a merge / a
>    split for the shard. (You can trivially handle re-shards by re-starting)
>    - *Push:* The push based model directly uses KCL (The Kinesis Client
>    Library) to subscribe to events. KCL will handle sharding, re-balancing,
>    checkpointing internally.
>
> Your consumer can implement the *CheckpointListener
> <https://github.com/apache/samza/blob/master/samza-api/
> src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
> interface to get notified when Samza performs a checkpoint. You can save
> the offsets of the shards to kinesis by invoking
> IRecordCheckpointer.checkpoint
> <https://github.com/awslabs/amazon-kinesis-client/blob/
> master/src/main/java/com/amazonaws/services/kinesis/
> clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
> APIs.
>
>
> *2. "My plan was to use the default KafkaCheckpointManagerFactory on an
> timed interval basis"*
>
> The checkpoint manager merely provides persistence for the checkpoints. (In
> that sense, it's actually a checkpoint writer). You probably don't want to
> implement a custom checkpoint manager.
>
>
> *3. "What exactly is being checkpointed? What value can I retrieve to use
> as an offset for my Kinesis stream? Or is this something I need to keep
> track of in a store? If so, what is the point of checkpointing? Can I use
> RocksDb to save the Kinesis offset at every document (efficiently that
> is)?"*
>
> *- *Samza checkpoints [ssp, offset] pairs for your tasks.
> - Kinesis has an implicit notion of sequence numbers for every shard in a
> stream. You can use that as offsets.
> - You don't want to record offsets in a separate store. If you want Samza
> to manage offsets, Samza will use Kafka internally. If you want Kinesis to
> manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.
>
>
>
>
> On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <ev...@hotmail.com>
> wrote:
>
> > Starting on an integration project between a Kinesis stream and Samza,
> > despite have no background in either, but I am familiar with most other
> > messaging/queuing systems.
> >
> >
> > Decided to keep all state management within Samza instead of using
> > Kinesis' client library. My plan was to use the default
> > KafkaCheckpointManagerFactory on an timed interval basis, but I have a
> few
> > questions.
> >
> >
> > What exactly is being checkpointed? What value can I retrieve to use as
> an
> > offset for my Kinesis stream? Or is this something I need to keep track
> of
> > in a store? If so, what is the point of checkpointing? Can I use RocksDb
> to
> > save the Kinesis offset at every document (efficiently that is)?
> >
> >
> > Related to Kinesis and not quite Samza, it does not have a listener/push
> > framework, but it requires constant polling (unless I missed something).
> > First of all, I was going to have a partition for each Kinesis shard
> > partition. But the next question is, should I simply have a while(true)
> > polling method inside my consumer(BlockingEnvelopeMap)? Seems
> inefficient,
> > even with a timeout. How can I get new data to instantiate a new
> consumer?
> > My consumer will put a new document to my task.
> >
> >
> > Cheers.
> >
> >
> >
> >
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>

Re: Questions about checkpointing and Kinesis

Posted by Chad Greenberg <ev...@hotmail.com>.
Thanks Jagadish for the reply. A few comments.


I was under the impression that using the KCL would mean recording the offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so that I can save state in Samza.


I assumed that the checkpoint values were based on the values of the put() method, but I did not see any explicit documentation to that effect (I have been reading a ton, so I could have missed something). What I do not see however, is how to retrieve those values upon start-up. Is this the role of the SystemAdmin? Is there any documentation about the use of SystemAdmin?

________________________________
From: Jagadish Venkatraman <ja...@gmail.com>
Sent: Tuesday, February 7, 2017 1:48:00 AM
To: dev@samza.apache.org
Subject: Re: Questions about checkpointing and Kinesis

Great to hear this development on the kinesis consumer!

Let me answer some of your questions here.

*1. "Kinesis does not have a listener/push framework (unless I missed
something)"*

 Let me point out that Kinesis has both a push and a pull based model. You
can choose to implement either for your use-case.

   - *Pull:* The pull based model supports obtaining an *ShardIterator* for
   a shard and iterating on it. Please refer the docs for the
   *ShardIteratorRequest* here
   <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
   It's important that you are aware of re-sharding - The
   record.getNextShardIterator() can return null if there was a merge / a
   split for the shard. (You can trivially handle re-shards by re-starting)
   - *Push:* The push based model directly uses KCL (The Kinesis Client
   Library) to subscribe to events. KCL will handle sharding, re-balancing,
   checkpointing internally.

Your consumer can implement the *CheckpointListener
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
interface to get notified when Samza performs a checkpoint. You can save
the offsets of the shards to kinesis by invoking
IRecordCheckpointer.checkpoint
<https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
APIs.


*2. "My plan was to use the default KafkaCheckpointManagerFactory on an
timed interval basis"*

The checkpoint manager merely provides persistence for the checkpoints. (In
that sense, it's actually a checkpoint writer). You probably don't want to
implement a custom checkpoint manager.


*3. "What exactly is being checkpointed? What value can I retrieve to use
as an offset for my Kinesis stream? Or is this something I need to keep
track of in a store? If so, what is the point of checkpointing? Can I use
RocksDb to save the Kinesis offset at every document (efficiently that
is)?"*

*- *Samza checkpoints [ssp, offset] pairs for your tasks.
- Kinesis has an implicit notion of sequence numbers for every shard in a
stream. You can use that as offsets.
- You don't want to record offsets in a separate store. If you want Samza
to manage offsets, Samza will use Kafka internally. If you want Kinesis to
manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.




On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <ev...@hotmail.com>
wrote:

> Starting on an integration project between a Kinesis stream and Samza,
> despite have no background in either, but I am familiar with most other
> messaging/queuing systems.
>
>
> Decided to keep all state management within Samza instead of using
> Kinesis' client library. My plan was to use the default
> KafkaCheckpointManagerFactory on an timed interval basis, but I have a few
> questions.
>
>
> What exactly is being checkpointed? What value can I retrieve to use as an
> offset for my Kinesis stream? Or is this something I need to keep track of
> in a store? If so, what is the point of checkpointing? Can I use RocksDb to
> save the Kinesis offset at every document (efficiently that is)?
>
>
> Related to Kinesis and not quite Samza, it does not have a listener/push
> framework, but it requires constant polling (unless I missed something).
> First of all, I was going to have a partition for each Kinesis shard
> partition. But the next question is, should I simply have a while(true)
> polling method inside my consumer(BlockingEnvelopeMap)? Seems inefficient,
> even with a timeout. How can I get new data to instantiate a new consumer?
> My consumer will put a new document to my task.
>
>
> Cheers.
>
>
>
>


--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Re: Questions about checkpointing and Kinesis

Posted by Jagadish Venkatraman <ja...@gmail.com>.
Great to hear this development on the kinesis consumer!

Let me answer some of your questions here.

*1. "Kinesis does not have a listener/push framework (unless I missed
something)"*

 Let me point out that Kinesis has both a push and a pull based model. You
can choose to implement either for your use-case.

   - *Pull:* The pull based model supports obtaining an *ShardIterator* for
   a shard and iterating on it. Please refer the docs for the
   *ShardIteratorRequest* here
   <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
   It's important that you are aware of re-sharding - The
   record.getNextShardIterator() can return null if there was a merge / a
   split for the shard. (You can trivially handle re-shards by re-starting)
   - *Push:* The push based model directly uses KCL (The Kinesis Client
   Library) to subscribe to events. KCL will handle sharding, re-balancing,
   checkpointing internally.

Your consumer can implement the *CheckpointListener
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
interface to get notified when Samza performs a checkpoint. You can save
the offsets of the shards to kinesis by invoking
IRecordCheckpointer.checkpoint
<https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
APIs.


*2. "My plan was to use the default KafkaCheckpointManagerFactory on an
timed interval basis"*

The checkpoint manager merely provides persistence for the checkpoints. (In
that sense, it's actually a checkpoint writer). You probably don't want to
implement a custom checkpoint manager.


*3. "What exactly is being checkpointed? What value can I retrieve to use
as an offset for my Kinesis stream? Or is this something I need to keep
track of in a store? If so, what is the point of checkpointing? Can I use
RocksDb to save the Kinesis offset at every document (efficiently that
is)?"*

*- *Samza checkpoints [ssp, offset] pairs for your tasks.
- Kinesis has an implicit notion of sequence numbers for every shard in a
stream. You can use that as offsets.
- You don't want to record offsets in a separate store. If you want Samza
to manage offsets, Samza will use Kafka internally. If you want Kinesis to
manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.




On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <ev...@hotmail.com>
wrote:

> Starting on an integration project between a Kinesis stream and Samza,
> despite have no background in either, but I am familiar with most other
> messaging/queuing systems.
>
>
> Decided to keep all state management within Samza instead of using
> Kinesis' client library. My plan was to use the default
> KafkaCheckpointManagerFactory on an timed interval basis, but I have a few
> questions.
>
>
> What exactly is being checkpointed? What value can I retrieve to use as an
> offset for my Kinesis stream? Or is this something I need to keep track of
> in a store? If so, what is the point of checkpointing? Can I use RocksDb to
> save the Kinesis offset at every document (efficiently that is)?
>
>
> Related to Kinesis and not quite Samza, it does not have a listener/push
> framework, but it requires constant polling (unless I missed something).
> First of all, I was going to have a partition for each Kinesis shard
> partition. But the next question is, should I simply have a while(true)
> polling method inside my consumer(BlockingEnvelopeMap)? Seems inefficient,
> even with a timeout. How can I get new data to instantiate a new consumer?
> My consumer will put a new document to my task.
>
>
> Cheers.
>
>
>
>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University