You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ashwin Sinha <as...@go-mmt.com> on 2018/06/24 19:05:42 UTC

Some doubts related to Rocksdb state backed and checkpointing!

Hi,

We are using flink1.3.2 and trying to explore rocksdb state backend and
checkpointing. Data source is Kafka and checkpointing enabled in Flink.
We have few doubts regarding the same:

   - What is the exact difference between checkpoint and state backend?
   - Is the data stored in rocksdb checkpoints incremental(it keeps all
   past data also in newer file)? New checkpoint is created after defined
   interval and does it contains the previous checkpoint's data? Our use case
   demands all the checkpoint data to be in a single db, but when we manually
   restart the job it's id changes and new directory gets created(new metadata
   file in case of savepoints).
   - What data does rocksdb stores inside in case of checkpoints? We are
   interested in knowing whether it stores actual aggregations or it stores
   the offsets metadata for an aggregation window?
   - If we run aggregations on past data, then will it take help of state
   backend to not run aggregations again and give results by querying the
   state backend, saving the processing time?


-- 
*Ashwin Sinha *| Data Engineer
ashwin.sinha@go-mmt.com <sh...@go-mmt.com> | 9452075361
<https://www.makemytrip.com/> <https://www.goibibo.com/>
<https://www.redbus.in/>2nd floor, Tower B Divyashree Technopolis Yemalur,
Bangalore, Karnataka 560025, India
<https://www.redbus.in/>

-- 


::DISCLAIMER::


----------------------------------------------------------------------------------------------------------------------------------------------------





This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this e-mail in error, please notify us 
immediately by return e-mail and delete this e-mail and all attachments 
from your system.

Re: Some doubts related to Rocksdb state backed and checkpointing!

Posted by sihua zhou <su...@163.com>.

Hi Ashwin,


I think the questions here might be a bit general and that could make it a bit hard to offer the answer meet your expected exactly, could you please somehow bref outlined your user case here to accossiated with questions, that would definitely make it easier to offer a better help, and I would also suggest you to have a look at https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/ to get some information related to the state & state backend & checkpoint.


However, I tried to answer the question from my understanding here, hope that could help you somehow.


> What is the exact difference between checkpoint and state backend.


In a nutshell(For the sake of understanding we only consider the Keyed State Backend here), you could consider the state backend as a local database in the stateful nodes to store the key-value pairs. And the checkpoint consists of the snapshot of the state backend in a distributed environment at an special time point, as @Minglei mentioned, it used for fault tolerant. When the job met a failture, it could recover the job from the latest successful checkpoint(use the state backend's snapshot to init the state backend) to continue its works without losing any data and make the job get an "exactly once result"(you could also configure the checkpoint to achive a "at least once result"). 


>  Is the data stored in rocksdb checkpoints incremental(it keeps all past data also in newer file)? New checkpoint is created after defined interval and does it contains the previous checkpoint's data? Our use case demands all the checkpoint data to be in a single db, but when we manually restart the job it's id changes and new directory gets created(new metadata file in case of savepoints).


Every checkpoint consists of a completed snapshot of the state backends in a distributed environment, which means it covers the previous state data, but its implementation could be incremental.


> What data does rocksdb stores inside in case of checkpoints? We are interested in knowing whether it stores actual aggregations or it stores the offsets metadata for an aggregation window?


1. The rocksdb state backend store the key-value pairs, and the checkpoint consists of rocksdb state backend's snapshot.
 
2. Not very sure what you exactly means here, I would suppose you are using the follow in your job,
{code}
stream.keyBy(key field).window(window size).proccess(AggregationFunc()).
{code}
If your job could be descried as above, then the aggregation results(generated in AggregationFunc) are store in the RocksDB as the value part, its corresponding key part is the "key field"(or the "key field" + "window" if you are using the per window state).


> If we run aggregations on past data, then will it take help of state backend to not run aggregations again and give results by querying the state backend, saving the processing time?


State backend is used to store the key-value pair, the aggregation logical are done by the user code. I think you could use the state to do what you expected here.


Best, Sihua
On 06/25/2018 09:31,zhangminglei<18...@163.com> wrote:
Hi,Ashwin


What is the exact difference between checkpoint and state backend?


Ans: I can answer the first question you asked. Checkpoint is a mechanism that can make your program fault tolerant. Flink uses  distributed snapshots implements checkpoint. But here is the question, where do I to store these states for my program ? Here is state backend comes.
You can make your state to store in memory, filesystem, rocksdb. And the default is memory state backend. Please see more [1], [2]


Cheers
Minglei


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/stream_checkpointing.html#introduction
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html




在 2018年6月25日,上午3:05,Ashwin Sinha <as...@go-mmt.com> 写道:


Hi,

We are using flink1.3.2 and trying to explore rocksdb state backend and checkpointing. Data source is Kafka and checkpointing enabled in Flink.
We have few doubts regarding the same:
What is the exact difference between checkpoint and state backend?

Is the data stored in rocksdb checkpoints incremental(it keeps all past data also in newer file)? New checkpoint is created after defined interval and does it contains the previous checkpoint's data? Our use case demands all the checkpoint data to be in a single db, but when we manually restart the job it's id changes and new directory gets created(new metadata file in case of savepoints).
What data does rocksdb stores inside in case of checkpoints? We are interested in knowing whether it stores actual aggregations or it stores the offsets metadata for an aggregation window?

If we run aggregations on past data, then will it take help of state backend to not run aggregations again and give results by querying the state backend, saving the processing time?



--

Ashwin Sinha | Data Engineer
ashwin.sinha@go-mmt.com | 9452075361

2nd floor, Tower B Divyashree Technopolis Yemalur, Bangalore, Karnataka 560025, India






::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------




This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.



Re: Some doubts related to Rocksdb state backed and checkpointing!

Posted by zhangminglei <18...@163.com>.
Hi,Ashwin

> What is the exact difference between checkpoint and state backend?


Ans: I can answer the first question you asked. Checkpoint is a mechanism that can make your program fault tolerant. Flink uses  distributed snapshots implements checkpoint. But here is the question, where do I to store these states for my program ? Here is state backend comes.
You can make your state to store in memory, filesystem, rocksdb. And the default is memory state backend. Please see more [1], [2]

Cheers
Minglei

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/stream_checkpointing.html#introduction <https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/stream_checkpointing.html#introduction>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html>


> 在 2018年6月25日,上午3:05,Ashwin Sinha <as...@go-mmt.com> 写道:
> 
> Hi,
> 
> We are using flink1.3.2 and trying to explore rocksdb state backend and checkpointing. Data source is Kafka and checkpointing enabled in Flink.
> We have few doubts regarding the same:
> What is the exact difference between checkpoint and state backend?
> Is the data stored in rocksdb checkpoints incremental(it keeps all past data also in newer file)? New checkpoint is created after defined interval and does it contains the previous checkpoint's data? Our use case demands all the checkpoint data to be in a single db, but when we manually restart the job it's id changes and new directory gets created(new metadata file in case of savepoints).
> What data does rocksdb stores inside in case of checkpoints? We are interested in knowing whether it stores actual aggregations or it stores the offsets metadata for an aggregation window?
> If we run aggregations on past data, then will it take help of state backend to not run aggregations again and give results by querying the state backend, saving the processing time?
> 
> -- 
> Ashwin Sinha | Data Engineer
> ashwin.sinha@go-mmt.com <ma...@go-mmt.com> | 9452075361
>  <https://www.makemytrip.com/> <https://www.goibibo.com/>
>  <https://www.redbus.in/>2nd floor, Tower B Divyashree Technopolis Yemalur, Bangalore, Karnataka 560025, India
>  <https://www.redbus.in/>
> 
> 
> ::DISCLAIMER::
> 
> ----------------------------------------------------------------------------------------------------------------------------------------------------
> 
> 
> 
> This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.
>