You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Josh <jo...@gmail.com> on 2016/10/24 17:06:22 UTC

Checkpointing large RocksDB state to S3 - tips?

Hi all,

I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
checkpointing a fairly large RocksDB state to S3.

I've found that when the state size hits 10GB, the checkpoint takes around
6 minutes, according to the Flink dashboard. Originally my checkpoint
interval was 5 minutes for the job, but I've found that the YARN container
crashes (I guess because the checkpoint time is greater than the checkpoint
interval), so have now decreased the checkpoint frequency to every 10
minutes.

I was just wondering if anyone has any tips about how to reduce the
checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
uploading at ~30MB/sec. I believe the m3.xlarge instances should have
around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
Since there are 2 instances, I'm not sure if that means each instance is
uploading at 15MB/sec - do the state uploads get shared equally among the
instances, assuming the state is split equally between the task managers?

If the state upload is split between the instances, perhaps the only way to
speed up the checkpoints is to add more instances and task managers, and
split the state equally among the task managers?

Also just wondering - is there any chance the incremental checkpoints work
will be complete any time soon?

Thanks,
Josh

Re: Checkpointing large RocksDB state to S3 - tips?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Josh,
might the bandwidth to S3 be shared by all the running nodes? (Not sure how
that is setup, so I'm just guessing here.)

If you're on 1.2-SNAPSHOT you should also get fully elastic jobs in about a
week. (I'm talking about the ability to restart from a savepoint with a
different parallelism here.) Keyed state and the Kafka source are already
properly elastic, the only missing bit are the timers (for the window
operator, mostly) but we're currently working on making that elastic as
well.

Cheers,
Aljoscha

On Tue, 25 Oct 2016 at 16:43 Josh <jo...@gmail.com> wrote:

> Hi Aljoscha,
>
> Thanks for the reply!
>
> I found that my stateful operator (with parallelism 10) wasn't equally
> split between the task managers on the two nodes (it was split 9/1) - so I
> tweaked the task manager / slot configuration until Flink allocated them
> equally with 5 instances of the operator on each node. (Just wondering if
> there's a better way to get Flink to allocate this specific operator
> equally between nodes, regardless of the number of slots available on
> each?) Having split the stateful operator equally between 2 nodes, I am
> actually able to checkpoint 18.5MB of state in ~4 minutes. Which indicates
> an overall throughput of ~77MB/sec (38.5MB/sec per node).
>
> I did what you said and tried uploading a large file from one of those VMs
> to S3 using the AWS command line tool. It uploaded at a speed of ~76MB/sec.
> Which is nearly double 38MB/sec but at least it's not orders of magnitude
> out. Does that sound ok? - I guess there's more that goes on when Flink
> takes a checkpoint than just uploading anyway... I've upgraded my cluster
> to Flink 1.2-SNAPSHOT yesterday so yeah should be using the fully async
> mode.
>
> I'll have a proper look in the logs if I see it crash again, and for now
> will just add more nodes whenever we need to speed up the checkpointing.
>
> Thanks,
> Josh
>
>
>
>
> On Tue, Oct 25, 2016 at 3:12 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi Josh,
> Checkpoints that take longer than the checkpoint interval should not be an
> issue (if you use an up-to-date version of Flink). The checkpoint
> coordinator will not issue another checkpoint while another one is still
> ongoing. Is there maybe some additional data for the crashes? A log perhaps?
>
> Regarding upload speed, yes, each instance of an operator is responsible
> for uploading its state so if state is equally distributed between
> operators on TaskManagers that would mean that each TaskManager would
> upload roughly the same amount of state. It might be interesting to see
> what the raw upload speed is when you have those to VMs upload to S3, if it
> is a lot larger than the speed you're seeing something would be wrong and
> we should investigate. One last thing: are you using the "fully async" mode
> of RocksDB? I think I remember that you do, just checking.
>
> If it is indeed a problem of upload speed to S3 per machine then yes,
> using more instances should speed up checkpointing.
>
> About incremental checkpoints: they're not going to make it into 1.2 with
> the current planning but after that, I don't know yet.
>
> Cheers,
> Aljoscha
>
>
> On Mon, 24 Oct 2016 at 19:06 Josh <jo...@gmail.com> wrote:
>
> Hi all,
>
> I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
> checkpointing a fairly large RocksDB state to S3.
>
> I've found that when the state size hits 10GB, the checkpoint takes around
> 6 minutes, according to the Flink dashboard. Originally my checkpoint
> interval was 5 minutes for the job, but I've found that the YARN container
> crashes (I guess because the checkpoint time is greater than the checkpoint
> interval), so have now decreased the checkpoint frequency to every 10
> minutes.
>
> I was just wondering if anyone has any tips about how to reduce the
> checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
> uploading at ~30MB/sec. I believe the m3.xlarge instances should have
> around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
> Since there are 2 instances, I'm not sure if that means each instance is
> uploading at 15MB/sec - do the state uploads get shared equally among the
> instances, assuming the state is split equally between the task managers?
>
> If the state upload is split between the instances, perhaps the only way
> to speed up the checkpoints is to add more instances and task managers, and
> split the state equally among the task managers?
>
> Also just wondering - is there any chance the incremental checkpoints work
> will be complete any time soon?
>
> Thanks,
> Josh
>
>
>

Re: Checkpointing large RocksDB state to S3 - tips?

Posted by Josh <jo...@gmail.com>.
Hi Aljoscha,

Thanks for the reply!

I found that my stateful operator (with parallelism 10) wasn't equally
split between the task managers on the two nodes (it was split 9/1) - so I
tweaked the task manager / slot configuration until Flink allocated them
equally with 5 instances of the operator on each node. (Just wondering if
there's a better way to get Flink to allocate this specific operator
equally between nodes, regardless of the number of slots available on
each?) Having split the stateful operator equally between 2 nodes, I am
actually able to checkpoint 18.5MB of state in ~4 minutes. Which indicates
an overall throughput of ~77MB/sec (38.5MB/sec per node).

I did what you said and tried uploading a large file from one of those VMs
to S3 using the AWS command line tool. It uploaded at a speed of ~76MB/sec.
Which is nearly double 38MB/sec but at least it's not orders of magnitude
out. Does that sound ok? - I guess there's more that goes on when Flink
takes a checkpoint than just uploading anyway... I've upgraded my cluster
to Flink 1.2-SNAPSHOT yesterday so yeah should be using the fully async
mode.

I'll have a proper look in the logs if I see it crash again, and for now
will just add more nodes whenever we need to speed up the checkpointing.

Thanks,
Josh




On Tue, Oct 25, 2016 at 3:12 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Josh,
> Checkpoints that take longer than the checkpoint interval should not be an
> issue (if you use an up-to-date version of Flink). The checkpoint
> coordinator will not issue another checkpoint while another one is still
> ongoing. Is there maybe some additional data for the crashes? A log perhaps?
>
> Regarding upload speed, yes, each instance of an operator is responsible
> for uploading its state so if state is equally distributed between
> operators on TaskManagers that would mean that each TaskManager would
> upload roughly the same amount of state. It might be interesting to see
> what the raw upload speed is when you have those to VMs upload to S3, if it
> is a lot larger than the speed you're seeing something would be wrong and
> we should investigate. One last thing: are you using the "fully async" mode
> of RocksDB? I think I remember that you do, just checking.
>
> If it is indeed a problem of upload speed to S3 per machine then yes,
> using more instances should speed up checkpointing.
>
> About incremental checkpoints: they're not going to make it into 1.2 with
> the current planning but after that, I don't know yet.
>
> Cheers,
> Aljoscha
>
>
> On Mon, 24 Oct 2016 at 19:06 Josh <jo...@gmail.com> wrote:
>
> Hi all,
>
> I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
> checkpointing a fairly large RocksDB state to S3.
>
> I've found that when the state size hits 10GB, the checkpoint takes around
> 6 minutes, according to the Flink dashboard. Originally my checkpoint
> interval was 5 minutes for the job, but I've found that the YARN container
> crashes (I guess because the checkpoint time is greater than the checkpoint
> interval), so have now decreased the checkpoint frequency to every 10
> minutes.
>
> I was just wondering if anyone has any tips about how to reduce the
> checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
> uploading at ~30MB/sec. I believe the m3.xlarge instances should have
> around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
> Since there are 2 instances, I'm not sure if that means each instance is
> uploading at 15MB/sec - do the state uploads get shared equally among the
> instances, assuming the state is split equally between the task managers?
>
> If the state upload is split between the instances, perhaps the only way
> to speed up the checkpoints is to add more instances and task managers, and
> split the state equally among the task managers?
>
> Also just wondering - is there any chance the incremental checkpoints work
> will be complete any time soon?
>
> Thanks,
> Josh
>
>

Re: Checkpointing large RocksDB state to S3 - tips?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Josh,
Checkpoints that take longer than the checkpoint interval should not be an
issue (if you use an up-to-date version of Flink). The checkpoint
coordinator will not issue another checkpoint while another one is still
ongoing. Is there maybe some additional data for the crashes? A log perhaps?

Regarding upload speed, yes, each instance of an operator is responsible
for uploading its state so if state is equally distributed between
operators on TaskManagers that would mean that each TaskManager would
upload roughly the same amount of state. It might be interesting to see
what the raw upload speed is when you have those to VMs upload to S3, if it
is a lot larger than the speed you're seeing something would be wrong and
we should investigate. One last thing: are you using the "fully async" mode
of RocksDB? I think I remember that you do, just checking.

If it is indeed a problem of upload speed to S3 per machine then yes, using
more instances should speed up checkpointing.

About incremental checkpoints: they're not going to make it into 1.2 with
the current planning but after that, I don't know yet.

Cheers,
Aljoscha


On Mon, 24 Oct 2016 at 19:06 Josh <jo...@gmail.com> wrote:

Hi all,

I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
checkpointing a fairly large RocksDB state to S3.

I've found that when the state size hits 10GB, the checkpoint takes around
6 minutes, according to the Flink dashboard. Originally my checkpoint
interval was 5 minutes for the job, but I've found that the YARN container
crashes (I guess because the checkpoint time is greater than the checkpoint
interval), so have now decreased the checkpoint frequency to every 10
minutes.

I was just wondering if anyone has any tips about how to reduce the
checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
uploading at ~30MB/sec. I believe the m3.xlarge instances should have
around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
Since there are 2 instances, I'm not sure if that means each instance is
uploading at 15MB/sec - do the state uploads get shared equally among the
instances, assuming the state is split equally between the task managers?

If the state upload is split between the instances, perhaps the only way to
speed up the checkpoints is to add more instances and task managers, and
split the state equally among the task managers?

Also just wondering - is there any chance the incremental checkpoints work
will be complete any time soon?

Thanks,
Josh