You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2016/10/23 20:29:47 UTC

About stateful transformations

Hi all,

I don't have much experience with Flink, so please forget me if I ask some
obvious questions. I was taking a look to the documentation on stateful
transformations in Flink at https://ci.apache.org/projects/flink/flink-docs-
release-1.2/dev/state.html. I'm mostly interested in Flink for stream
processing, and I would like to know:

- What is the biggest state that has been used in production deployments?
I'm interested in how many GB of state, among all key-value pairs, have
been used before in long running streaming jobs deployed in production.
Maybe some company has shared this in some conference or blog post. I guess
for that RocksDB backend is the best option for big states, to avoid being
limited by the available memory.

- Is there any pre built functionality for state eviction? I'm thinking of
LRU cache-like behavior, with eviction based on time or size, similar to
Guava cache (https://github.com/google/guava/wiki/CachesExplained). This is
probably easy to implement anyway, by using the clear() primitive, but I
wouldn't like to reinvent the wheel if this is already implemented
somewhere.

- When using file:// for the checkpointing URL, is the data replicated in
the workers, or a failure in a worker leads to losing the state stored in
that worker? I guess with hdfs:// we get the replication of HDFS, and we
don't have that problem. I also guess S3 can be used for checkpointing
state too, is there any remarkable performance impact of using s3 instead
of HDFS for checkpointing? I guess some performance is lost compared to a
setup running in YARN with collocated DataNodes and NodeManagers, but I
wanted to know if the impact is negible, as checkpointing is performed at a
relatively slow frequency. Also I'm interested on Flink running on EMR,
where the impact of this should be even smaller because the access to S3 is
faster from EMR than from an in-house YARN cluster outside the AWS cloud.

- Is there any problem with the RocksDB backend on top of HDFS related to
defragmentation? How is clear handled for long running jobs? I'm thinking
on a streaming job that has a state with a size of several hundred GBs,
where each key-pair is stored for a week and then deleted. How does clear()
work, and how do you deal with the "small files problem" of HDFS (
http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
problem for S3, as it is an object store that has no problem with small
files.

Thanks a lot for your help!

Greetings,

Juan Rodriguez Hortala

Re: About stateful transformations

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Aljoscha,

Thanks for your answer. At least by keeping only the latest one we don't
have retention problems with the state backend, and for now I guess we
could use manually triggered savepoints if we needed to store the history
of the state.

Thanks,

Juan

On Tue, Oct 25, 2016 at 6:58 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> there is already a mechanism for that. Currently, Flink will only keep the
> most recent, successful checkpoint. We are currently working on making that
> configurable so that, for example, the last n successful checkpoints can be
> kept.
>
> Cheers,
> Aljoscha
>
> On Tue, 25 Oct 2016 at 06:47 Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Thanks a lot for your response, it was very clear. I understand that
>> there is no problem of small files due to checkpointing not being
>> incremental. I also understand that each worker will interpret a file://
>> URL as local to its own file system, which works ok if all workers have a
>> remove file system mounted in the same local path.
>>
>> Now I have to check if Flink provides some expiration mechanism for old
>> checkpoints, although for S3 that is already available, and for HDFS I
>> guess some script that periodically deletes older files with hdfs dfs
>> -rmr should be easy enough. Is there any documentation about some naming
>> convention for checkpoint files that I could rely to delete old
>> checkpoints? E.g. some naming schema that uses dates, it would be nicer if
>> it was documented because then it would be more stable.
>>
>> Thanks again for your help.
>>
>> Greetings,
>>
>> Juan
>>
>>
>> On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra <gy...@apache.org> wrote:
>>
>> Hi Juan,
>>
>> Let me try to answer some of your questions :)
>>
>> We have been running Flink Streaming at King for quite some time now with
>> multiple jobs having several hundred gigabytes of KV state stored in
>> RocksDB. I would say RocksDB state backend is definitely the best choice at
>> the moment for large deployments as you can also keep the heap relatively
>> small to save some time on GC. But you have to play around with the rocks
>> configuration to get the most out of it depending on your hardware.
>>
>> I am not aware of any caching/TTL functionality exposed in the Flink APIs
>> currently. But if you are willing to dig a llittle deeper you could
>> implement a lower lever operator that uses timers like the windowing
>> mechanisms to clear state after some time.
>>
>> When you are selecting a checkpoint directory (URL) you need to make sure
>> that it is accessible from all the task managers. HDFS is convenient but is
>> not strictly necessary. We for instance use CEPH that is mounted as a
>> regular disk from the OS's perspective so we can use file:// and still save
>> to the distributed storage. As far as I know using yarn doesnt give much
>> benefit, I am not sure if Flink exploits any data locality at this moment.
>>
>> When you are running rocks db state backend there are two concepts you
>> need to think about for checkpointing. Your local rocks db directory, and
>> the checkpoint directory. The local directory is where the rocks instances
>> are created and they live on the taskmanagers local disk/memory. When Flink
>> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
>> or to the selected checkpoint directory. This means there is no data
>> fragmentation in the checkpoints. Similar applies to the FsStateBackend but
>> that keeps the local state strictly in memory.
>>
>> I think you should definitely give RocksDB + HDFS a try. It works
>> extremely well for very large state sizes given some tuning, but should
>> also perform out-of-the-box :)
>>
>> Cheers,
>> Gyula
>>
>> Juan Rodríguez Hortalá <ju...@gmail.com> ezt írta
>> (időpont: 2016. okt. 23., V, 22:29):
>>
>> Hi all,
>>
>> I don't have much experience with Flink, so please forget me if I ask
>> some obvious questions. I was taking a look to the documentation
>> on stateful transformations in Flink at https://ci.apache.org/
>> projects/flink/flink-docs-release-1.2/dev/state.html. I'm mostly
>> interested in Flink for stream processing, and I would like to know:
>>
>> - What is the biggest state that has been used in production deployments?
>> I'm interested in how many GB of state, among all key-value pairs, have
>> been used before in long running streaming jobs deployed in production.
>> Maybe some company has shared this in some conference or blog post. I guess
>> for that RocksDB backend is the best option for big states, to avoid being
>> limited by the available memory.
>>
>> - Is there any pre built functionality for state eviction? I'm thinking
>> of LRU cache-like behavior, with eviction based on time or size, similar to
>> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
>> is probably easy to implement anyway, by using the clear() primitive, but I
>> wouldn't like to reinvent the wheel if this is already implemented
>> somewhere.
>>
>> - When using file:// for the checkpointing URL, is the data replicated in
>> the workers, or a failure in a worker leads to losing the state stored in
>> that worker? I guess with hdfs:// we get the replication of HDFS, and we
>> don't have that problem. I also guess S3 can be used for checkpointing
>> state too, is there any remarkable performance impact of using s3 instead
>> of HDFS for checkpointing? I guess some performance is lost compared to a
>> setup running in YARN with collocated DataNodes and NodeManagers, but I
>> wanted to know if the impact is negible, as checkpointing is performed at a
>> relatively slow frequency. Also I'm interested on Flink running on EMR,
>> where the impact of this should be even smaller because the access to S3 is
>> faster from EMR than from an in-house YARN cluster outside the AWS cloud.
>>
>> - Is there any problem with the RocksDB backend on top of HDFS related to
>> defragmentation? How is clear handled for long running jobs? I'm thinking
>> on a streaming job that has a state with a size of several hundred GBs,
>> where each key-pair is stored for a week and then deleted. How does clear()
>> work, and how do you deal with the "small files problem" of HDFS (
>> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
>> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
>> problem for S3, as it is an object store that has no problem with small
>> files.
>>
>> Thanks a lot for your help!
>>
>> Greetings,
>>
>> Juan Rodriguez Hortala
>>
>>
>>

Re: About stateful transformations

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
there is already a mechanism for that. Currently, Flink will only keep the
most recent, successful checkpoint. We are currently working on making that
configurable so that, for example, the last n successful checkpoints can be
kept.

Cheers,
Aljoscha

On Tue, 25 Oct 2016 at 06:47 Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi Gyula,
>
> Thanks a lot for your response, it was very clear. I understand that there
> is no problem of small files due to checkpointing not being incremental. I
> also understand that each worker will interpret a file:// URL as local to
> its own file system, which works ok if all workers have a remove file
> system mounted in the same local path.
>
> Now I have to check if Flink provides some expiration mechanism for old
> checkpoints, although for S3 that is already available, and for HDFS I
> guess some script that periodically deletes older files with hdfs dfs
> -rmr should be easy enough. Is there any documentation about some naming
> convention for checkpoint files that I could rely to delete old
> checkpoints? E.g. some naming schema that uses dates, it would be nicer if
> it was documented because then it would be more stable.
>
> Thanks again for your help.
>
> Greetings,
>
> Juan
>
>
> On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra <gy...@apache.org> wrote:
>
> Hi Juan,
>
> Let me try to answer some of your questions :)
>
> We have been running Flink Streaming at King for quite some time now with
> multiple jobs having several hundred gigabytes of KV state stored in
> RocksDB. I would say RocksDB state backend is definitely the best choice at
> the moment for large deployments as you can also keep the heap relatively
> small to save some time on GC. But you have to play around with the rocks
> configuration to get the most out of it depending on your hardware.
>
> I am not aware of any caching/TTL functionality exposed in the Flink APIs
> currently. But if you are willing to dig a llittle deeper you could
> implement a lower lever operator that uses timers like the windowing
> mechanisms to clear state after some time.
>
> When you are selecting a checkpoint directory (URL) you need to make sure
> that it is accessible from all the task managers. HDFS is convenient but is
> not strictly necessary. We for instance use CEPH that is mounted as a
> regular disk from the OS's perspective so we can use file:// and still save
> to the distributed storage. As far as I know using yarn doesnt give much
> benefit, I am not sure if Flink exploits any data locality at this moment.
>
> When you are running rocks db state backend there are two concepts you
> need to think about for checkpointing. Your local rocks db directory, and
> the checkpoint directory. The local directory is where the rocks instances
> are created and they live on the taskmanagers local disk/memory. When Flink
> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
> or to the selected checkpoint directory. This means there is no data
> fragmentation in the checkpoints. Similar applies to the FsStateBackend but
> that keeps the local state strictly in memory.
>
> I think you should definitely give RocksDB + HDFS a try. It works
> extremely well for very large state sizes given some tuning, but should
> also perform out-of-the-box :)
>
> Cheers,
> Gyula
>
> Juan Rodríguez Hortalá <ju...@gmail.com> ezt írta
> (időpont: 2016. okt. 23., V, 22:29):
>
> Hi all,
>
> I don't have much experience with Flink, so please forget me if I ask some
> obvious questions. I was taking a look to the documentation on stateful
> transformations in Flink at
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html.
> I'm mostly interested in Flink for stream processing, and I would like to
> know:
>
> - What is the biggest state that has been used in production deployments?
> I'm interested in how many GB of state, among all key-value pairs, have
> been used before in long running streaming jobs deployed in production.
> Maybe some company has shared this in some conference or blog post. I guess
> for that RocksDB backend is the best option for big states, to avoid being
> limited by the available memory.
>
> - Is there any pre built functionality for state eviction? I'm thinking of
> LRU cache-like behavior, with eviction based on time or size, similar to
> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
> is probably easy to implement anyway, by using the clear() primitive, but I
> wouldn't like to reinvent the wheel if this is already implemented
> somewhere.
>
> - When using file:// for the checkpointing URL, is the data replicated in
> the workers, or a failure in a worker leads to losing the state stored in
> that worker? I guess with hdfs:// we get the replication of HDFS, and we
> don't have that problem. I also guess S3 can be used for checkpointing
> state too, is there any remarkable performance impact of using s3 instead
> of HDFS for checkpointing? I guess some performance is lost compared to a
> setup running in YARN with collocated DataNodes and NodeManagers, but I
> wanted to know if the impact is negible, as checkpointing is performed at a
> relatively slow frequency. Also I'm interested on Flink running on EMR,
> where the impact of this should be even smaller because the access to S3 is
> faster from EMR than from an in-house YARN cluster outside the AWS cloud.
>
> - Is there any problem with the RocksDB backend on top of HDFS related to
> defragmentation? How is clear handled for long running jobs? I'm thinking
> on a streaming job that has a state with a size of several hundred GBs,
> where each key-pair is stored for a week and then deleted. How does clear()
> work, and how do you deal with the "small files problem" of HDFS (
> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
> problem for S3, as it is an object store that has no problem with small
> files.
>
> Thanks a lot for your help!
>
> Greetings,
>
> Juan Rodriguez Hortala
>
>
>

Re: About stateful transformations

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Gyula,

Thanks a lot for your response, it was very clear. I understand that there
is no problem of small files due to checkpointing not being incremental. I
also understand that each worker will interpret a file:// URL as local to
its own file system, which works ok if all workers have a remove file
system mounted in the same local path.

Now I have to check if Flink provides some expiration mechanism for old
checkpoints, although for S3 that is already available, and for HDFS I
guess some script that periodically deletes older files with hdfs dfs
-rmr should
be easy enough. Is there any documentation about some naming convention for
checkpoint files that I could rely to delete old checkpoints? E.g. some
naming schema that uses dates, it would be nicer if it was documented
because then it would be more stable.

Thanks again for your help.

Greetings,

Juan


On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra <gy...@apache.org> wrote:

> Hi Juan,
>
> Let me try to answer some of your questions :)
>
> We have been running Flink Streaming at King for quite some time now with
> multiple jobs having several hundred gigabytes of KV state stored in
> RocksDB. I would say RocksDB state backend is definitely the best choice at
> the moment for large deployments as you can also keep the heap relatively
> small to save some time on GC. But you have to play around with the rocks
> configuration to get the most out of it depending on your hardware.
>
> I am not aware of any caching/TTL functionality exposed in the Flink APIs
> currently. But if you are willing to dig a llittle deeper you could
> implement a lower lever operator that uses timers like the windowing
> mechanisms to clear state after some time.
>
> When you are selecting a checkpoint directory (URL) you need to make sure
> that it is accessible from all the task managers. HDFS is convenient but is
> not strictly necessary. We for instance use CEPH that is mounted as a
> regular disk from the OS's perspective so we can use file:// and still save
> to the distributed storage. As far as I know using yarn doesnt give much
> benefit, I am not sure if Flink exploits any data locality at this moment.
>
> When you are running rocks db state backend there are two concepts you
> need to think about for checkpointing. Your local rocks db directory, and
> the checkpoint directory. The local directory is where the rocks instances
> are created and they live on the taskmanagers local disk/memory. When Flink
> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
> or to the selected checkpoint directory. This means there is no data
> fragmentation in the checkpoints. Similar applies to the FsStateBackend but
> that keeps the local state strictly in memory.
>
> I think you should definitely give RocksDB + HDFS a try. It works
> extremely well for very large state sizes given some tuning, but should
> also perform out-of-the-box :)
>
> Cheers,
> Gyula
>
> Juan Rodríguez Hortalá <ju...@gmail.com> ezt írta
> (időpont: 2016. okt. 23., V, 22:29):
>
>> Hi all,
>>
>> I don't have much experience with Flink, so please forget me if I ask
>> some obvious questions. I was taking a look to the documentation
>> on stateful transformations in Flink at https://ci.apache.org/
>> projects/flink/flink-docs-release-1.2/dev/state.html. I'm mostly
>> interested in Flink for stream processing, and I would like to know:
>>
>> - What is the biggest state that has been used in production deployments?
>> I'm interested in how many GB of state, among all key-value pairs, have
>> been used before in long running streaming jobs deployed in production.
>> Maybe some company has shared this in some conference or blog post. I guess
>> for that RocksDB backend is the best option for big states, to avoid being
>> limited by the available memory.
>>
>> - Is there any pre built functionality for state eviction? I'm thinking
>> of LRU cache-like behavior, with eviction based on time or size, similar to
>> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
>> is probably easy to implement anyway, by using the clear() primitive, but I
>> wouldn't like to reinvent the wheel if this is already implemented
>> somewhere.
>>
>> - When using file:// for the checkpointing URL, is the data replicated in
>> the workers, or a failure in a worker leads to losing the state stored in
>> that worker? I guess with hdfs:// we get the replication of HDFS, and we
>> don't have that problem. I also guess S3 can be used for checkpointing
>> state too, is there any remarkable performance impact of using s3 instead
>> of HDFS for checkpointing? I guess some performance is lost compared to a
>> setup running in YARN with collocated DataNodes and NodeManagers, but I
>> wanted to know if the impact is negible, as checkpointing is performed at a
>> relatively slow frequency. Also I'm interested on Flink running on EMR,
>> where the impact of this should be even smaller because the access to S3 is
>> faster from EMR than from an in-house YARN cluster outside the AWS cloud.
>>
>> - Is there any problem with the RocksDB backend on top of HDFS related to
>> defragmentation? How is clear handled for long running jobs? I'm thinking
>> on a streaming job that has a state with a size of several hundred GBs,
>> where each key-pair is stored for a week and then deleted. How does clear()
>> work, and how do you deal with the "small files problem" of HDFS (
>> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
>> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
>> problem for S3, as it is an object store that has no problem with small
>> files.
>>
>> Thanks a lot for your help!
>>
>> Greetings,
>>
>> Juan Rodriguez Hortala
>>
>

Re: About stateful transformations

Posted by Gyula Fóra <gy...@apache.org>.
Hi Juan,

Let me try to answer some of your questions :)

We have been running Flink Streaming at King for quite some time now with
multiple jobs having several hundred gigabytes of KV state stored in
RocksDB. I would say RocksDB state backend is definitely the best choice at
the moment for large deployments as you can also keep the heap relatively
small to save some time on GC. But you have to play around with the rocks
configuration to get the most out of it depending on your hardware.

I am not aware of any caching/TTL functionality exposed in the Flink APIs
currently. But if you are willing to dig a llittle deeper you could
implement a lower lever operator that uses timers like the windowing
mechanisms to clear state after some time.

When you are selecting a checkpoint directory (URL) you need to make sure
that it is accessible from all the task managers. HDFS is convenient but is
not strictly necessary. We for instance use CEPH that is mounted as a
regular disk from the OS's perspective so we can use file:// and still save
to the distributed storage. As far as I know using yarn doesnt give much
benefit, I am not sure if Flink exploits any data locality at this moment.

When you are running rocks db state backend there are two concepts you need
to think about for checkpointing. Your local rocks db directory, and the
checkpoint directory. The local directory is where the rocks instances are
created and they live on the taskmanagers local disk/memory. When Flink
takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
or to the selected checkpoint directory. This means there is no data
fragmentation in the checkpoints. Similar applies to the FsStateBackend but
that keeps the local state strictly in memory.

I think you should definitely give RocksDB + HDFS a try. It works extremely
well for very large state sizes given some tuning, but should also perform
out-of-the-box :)

Cheers,
Gyula

Juan Rodríguez Hortalá <ju...@gmail.com> ezt írta
(időpont: 2016. okt. 23., V, 22:29):

> Hi all,
>
> I don't have much experience with Flink, so please forget me if I ask some
> obvious questions. I was taking a look to the documentation on stateful
> transformations in Flink at
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html.
> I'm mostly interested in Flink for stream processing, and I would like to
> know:
>
> - What is the biggest state that has been used in production deployments?
> I'm interested in how many GB of state, among all key-value pairs, have
> been used before in long running streaming jobs deployed in production.
> Maybe some company has shared this in some conference or blog post. I guess
> for that RocksDB backend is the best option for big states, to avoid being
> limited by the available memory.
>
> - Is there any pre built functionality for state eviction? I'm thinking of
> LRU cache-like behavior, with eviction based on time or size, similar to
> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
> is probably easy to implement anyway, by using the clear() primitive, but I
> wouldn't like to reinvent the wheel if this is already implemented
> somewhere.
>
> - When using file:// for the checkpointing URL, is the data replicated in
> the workers, or a failure in a worker leads to losing the state stored in
> that worker? I guess with hdfs:// we get the replication of HDFS, and we
> don't have that problem. I also guess S3 can be used for checkpointing
> state too, is there any remarkable performance impact of using s3 instead
> of HDFS for checkpointing? I guess some performance is lost compared to a
> setup running in YARN with collocated DataNodes and NodeManagers, but I
> wanted to know if the impact is negible, as checkpointing is performed at a
> relatively slow frequency. Also I'm interested on Flink running on EMR,
> where the impact of this should be even smaller because the access to S3 is
> faster from EMR than from an in-house YARN cluster outside the AWS cloud.
>
> - Is there any problem with the RocksDB backend on top of HDFS related to
> defragmentation? How is clear handled for long running jobs? I'm thinking
> on a streaming job that has a state with a size of several hundred GBs,
> where each key-pair is stored for a week and then deleted. How does clear()
> work, and how do you deal with the "small files problem" of HDFS (
> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
> problem for S3, as it is an object store that has no problem with small
> files.
>
> Thanks a lot for your help!
>
> Greetings,
>
> Juan Rodriguez Hortala
>