You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2021/03/08 06:59:18 UTC
Gradually increasing checkpoint size
Hi!
I'm running a backfill Flink stream job over older data. It has multiple
interval joins. I noticed my checkpoint is regularly gaining in size. I'd
expect my checkpoints to stabilize and not grow.
Is there a setting to prune useless data from the checkpoint? My top guess
is that my checkpoint has a bunch of useless state in it.
- Dan
Re: Gradually increasing checkpoint size
Posted by Dan Hill <qu...@gmail.com>.
I figured it out. I have some records with the same key and I was doing an
IntervalJoin. One of the IntervalJoin implementations that I found looks
like it the runtime increases exponentially when there are duplicate keys.
I introduced a de-duping step and it works a lot faster.
On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz <dw...@apache.org>
wrote:
> Hey Dan,
>
> I think the logic should be correct. Mind that in the processElement we
> are using *relative*Upper/LowerBound, which are inverted global bound:
>
> relativeUpperBound = upperBound for left and -lowerBound for right
>
> relativeLowerBound = lowerBound for left and -upperBound for right
>
> Therefore the cleaning logic in onTimer effectively uses the same logic.
> If I understand it correctly, this trick was introduced to deduplicate the
> method.
>
> There might be a bug somewhere, but I don't think it's where you pointed.
> I'd suggest to first investigate the progress of watermarks.
>
> Best,
>
> Dawid
> On 09/03/2021 08:36, Dan Hill wrote:
>
> Hi Yun!
>
> That advice was useful. The state for that operator is very small
> (31kb). Most of the checkpoint size is in a couple simple
> DataStream.intervalJoin operators. The time intervals are fairly short.
>
> I'm going to try running the code with some small configuration changes.
> One thing I did notice is that I set a positive value for the
> relativeUpperBound. I'm not sure if I found a bug in IntervalJoinOperator
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
> The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
> clean up. It has some logic around cleaning up the right side that uses timerTimestamp
> + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
> However, processElement doesn’t use the same logic when creating a timer (I
> only see + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
> Maybe I'm misreading the code. It feels like a bug.
>
>
> On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yu...@aliyun.com> wrote:
>
>> Hi Dan,
>>
>> Regarding the original checkpoint size problem, could you also have a
>> check
>> which tasks' state are increasing from the checkpoint UI ? For example,
>> the
>> attached operator has a `alreadyOutputed` value state, which seems to keep
>> increasing if there are always new keys ?
>>
>> Best,
>> Yun
>>
>>
>> ------------------Original Mail ------------------
>> *Sender:*Dan Hill <qu...@gmail.com>
>> *Send Date:*Tue Mar 9 00:59:24 2021
>> *Recipients:*Yun Gao <yu...@aliyun.com>
>> *CC:*user <us...@flink.apache.org>
>> *Subject:*Re: Gradually increasing checkpoint size
>>
>>> Hi Yun!
>>>
>>> Thanks for the quick reply.
>>>
>>> One of the lowerBounds is large but the table being joined with is ~500
>>> rows. I also have my own operator that only outputs the first value.
>>>
>>> public class OnlyFirstUser<T extends GeneratedMessageV3> extends
>>> RichFlatMapFunction<T, T> {
>>>
>>>
>>> private transient ValueState<Boolean> alreadyOutputted;
>>>
>>>
>>> @Override
>>>
>>> public void flatMap(T value, Collector<T> out) throws Exception {
>>>
>>> if (!alreadyOutputted.value()) {
>>>
>>> alreadyOutputted.update(true);
>>>
>>> out.collect(value);
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> @Override
>>>
>>> public void open(Configuration config) {
>>>
>>> ValueStateDescriptor<Boolean> descriptor =
>>>
>>> new ValueStateDescriptor<>(
>>>
>>> "alreadyOutputted", // the state name
>>>
>>> TypeInformation.of(new TypeHint<Boolean>() {}),
>>> // type information
>>>
>>> false); // default value of the state, if
>>> nothing was set
>>>
>>> alreadyOutputted = getRuntimeContext().getState(descriptor);
>>>
>>> }
>>>
>>> }
>>>
>>> All of my inputs have this watermark strategy. In the Flink UI, early
>>> in the job run, I see "Low Watermarks" on each node and they increase.
>>> After some checkpoint failures, low watermarks stop appearing in the UI
>>> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
>>> .
>>>
>>>
>>> .assignTimestampsAndWatermarks(
>>>
>>>
>>> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>
>>>
>>>
>>> Thanks Yun!
>>>
>>>
>>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yu...@aliyun.com> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> Have you use a too large upperBound or lowerBound?
>>>>
>>>> If not, could you also check the watermark strategy ?
>>>> The interval join operator depends on the event-time
>>>> timer for cleanup, and the event-time timer would be
>>>> triggered via watermark.
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>>
>>>> ------------------Original Mail ------------------
>>>> *Sender:*Dan Hill <qu...@gmail.com>
>>>> *Send Date:*Mon Mar 8 14:59:48 2021
>>>> *Recipients:*user <us...@flink.apache.org>
>>>> *Subject:*Gradually increasing checkpoint size
>>>>
>>>>> Hi!
>>>>>
>>>>> I'm running a backfill Flink stream job over older data. It has
>>>>> multiple interval joins. I noticed my checkpoint is regularly gaining in
>>>>> size. I'd expect my checkpoints to stabilize and not grow.
>>>>>
>>>>> Is there a setting to prune useless data from the checkpoint? My top
>>>>> guess is that my checkpoint has a bunch of useless state in it.
>>>>>
>>>>> - Dan
>>>>>
>>>>
Re: Gradually increasing checkpoint size
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Dan,
I think the logic should be correct. Mind that in the processElement we
are using *relative*Upper/LowerBound, which are inverted global bound:
relativeUpperBound = upperBound for left and -lowerBound for right
relativeLowerBound = lowerBound for left and -upperBound for right
Therefore the cleaning logic in onTimer effectively uses the same logic.
If I understand it correctly, this trick was introduced to deduplicate
the method.
There might be a bug somewhere, but I don't think it's where you
pointed. I'd suggest to first investigate the progress of watermarks.
Best,
Dawid
On 09/03/2021 08:36, Dan Hill wrote:
> Hi Yun!
>
> That advice was useful. The state for that operator is very small
> (31kb). Most of the checkpoint size is in a couple simple
> DataStream.intervalJoin operators. The time intervals are fairly short.
>
> I'm going to try running the code with some small configuration
> changes. One thing I did notice is that I set a positive value for
> the relativeUpperBound. I'm not sure if I found a bug in
> IntervalJoinOperator
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
> The logic in IntervalJoinOperator.onEventTime needs an exact timestamp
> for clean up. It has some logic around cleaning up the right side
> that uses timerTimestamp + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
> However, processElement doesn’t use the same logic when creating a
> timer (I only see + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
> Maybe I'm misreading the code. It feels like a bug.
>
>
> On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yungao.gy@aliyun.com
> <ma...@aliyun.com>> wrote:
>
> Hi Dan,
>
> Regarding the original checkpoint size problem, could you also
> have a check
> which tasks' state are increasing from the checkpoint UI ? For
> example, the
> attached operator has a `alreadyOutputed` value state, which seems
> to keep
> increasing if there are always new keys ?
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Dan Hill <quietgolfer@gmail.com
> <ma...@gmail.com>>
> *Send Date:*Tue Mar 9 00:59:24 2021
> *Recipients:*Yun Gao <yungao.gy@aliyun.com
> <ma...@aliyun.com>>
> *CC:*user <user@flink.apache.org <ma...@flink.apache.org>>
> *Subject:*Re: Gradually increasing checkpoint size
>
> Hi Yun!
>
> Thanks for the quick reply.
>
> One of the lowerBounds is large but the table being joined
> with is ~500 rows. I also have my own operator that only
> outputs the first value.
>
> public class OnlyFirstUser<T extends GeneratedMessageV3>
> extends RichFlatMapFunction<T, T> {
>
>
> private transient ValueState<Boolean> alreadyOutputted;
>
>
> @Override
>
> public void flatMap(T value, Collector<T> out) throws
> Exception {
>
> if (!alreadyOutputted.value()) {
>
> alreadyOutputted.update(true);
>
> out.collect(value);
>
> }
>
> }
>
>
> @Override
>
> public void open(Configuration config) {
>
> ValueStateDescriptor<Boolean> descriptor =
>
> new ValueStateDescriptor<>(
>
> "alreadyOutputted", // the state name
>
> TypeInformation.of(new
> TypeHint<Boolean>() {}), // type information
>
> false); // default value of the
> state, if nothing was set
>
> alreadyOutputted =
> getRuntimeContext().getState(descriptor);
>
> }
>
> }
>
>
> All of my inputs have this watermark strategy. In the
> Flink UI, early in the job run, I see "Low Watermarks" on
> each node and they increase. After some checkpoint
> failures, low watermarks stop appearing in the UI
> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>.
>
>
> .assignTimestampsAndWatermarks(
>
>
> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>
>
>
> Thanks Yun!
>
>
> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao
> <yungao.gy@aliyun.com <ma...@aliyun.com>> wrote:
>
> Hi Dan,
>
> Have you use a too large upperBound or lowerBound?
>
> If not, could you also check the watermark strategy ?
> The interval join operator depends on the event-time
> timer for cleanup, and the event-time timer would be
> triggered via watermark.
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Dan Hill <quietgolfer@gmail.com
> <ma...@gmail.com>>
> *Send Date:*Mon Mar 8 14:59:48 2021
> *Recipients:*user <user@flink.apache.org
> <ma...@flink.apache.org>>
> *Subject:*Gradually increasing checkpoint size
>
> Hi!
>
> I'm running a backfill Flink stream job over
> older data. It has multiple interval joins.
> I noticed my checkpoint is regularly gaining
> in size. I'd expect my checkpoints to
> stabilize and not grow.
>
> Is there a setting to prune useless data from
> the checkpoint? My top guess is that my
> checkpoint has a bunch of useless state in it.
>
> - Dan
>
Re: Re: Gradually increasing checkpoint size
Posted by Dan Hill <qu...@gmail.com>.
Hi Yun!
That advice was useful. The state for that operator is very small (31kb).
Most of the checkpoint size is in a couple simple DataStream.intervalJoin
operators. The time intervals are fairly short.
I'm going to try running the code with some small configuration changes.
One thing I did notice is that I set a positive value for the
relativeUpperBound. I'm not sure if I found a bug in IntervalJoinOperator
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
clean up. It has some logic around cleaning up the right side that
uses timerTimestamp
+ lowerBound
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
However, processElement doesn’t use the same logic when creating a timer (I
only see + lowerBound
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
Maybe I'm misreading the code. It feels like a bug.
On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yu...@aliyun.com> wrote:
> Hi Dan,
>
> Regarding the original checkpoint size problem, could you also have a
> check
> which tasks' state are increasing from the checkpoint UI ? For example,
> the
> attached operator has a `alreadyOutputed` value state, which seems to keep
> increasing if there are always new keys ?
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Dan Hill <qu...@gmail.com>
> *Send Date:*Tue Mar 9 00:59:24 2021
> *Recipients:*Yun Gao <yu...@aliyun.com>
> *CC:*user <us...@flink.apache.org>
> *Subject:*Re: Gradually increasing checkpoint size
>
>> Hi Yun!
>>
>> Thanks for the quick reply.
>>
>> One of the lowerBounds is large but the table being joined with is ~500
>> rows. I also have my own operator that only outputs the first value.
>>
>> public class OnlyFirstUser<T extends GeneratedMessageV3> extends
>> RichFlatMapFunction<T, T> {
>>
>>
>> private transient ValueState<Boolean> alreadyOutputted;
>>
>>
>> @Override
>>
>> public void flatMap(T value, Collector<T> out) throws Exception {
>>
>> if (!alreadyOutputted.value()) {
>>
>> alreadyOutputted.update(true);
>>
>> out.collect(value);
>>
>> }
>>
>> }
>>
>>
>> @Override
>>
>> public void open(Configuration config) {
>>
>> ValueStateDescriptor<Boolean> descriptor =
>>
>> new ValueStateDescriptor<>(
>>
>> "alreadyOutputted", // the state name
>>
>> TypeInformation.of(new TypeHint<Boolean>() {}),
>> // type information
>>
>> false); // default value of the state, if
>> nothing was set
>>
>> alreadyOutputted = getRuntimeContext().getState(descriptor);
>>
>> }
>>
>> }
>>
>> All of my inputs have this watermark strategy. In the Flink UI, early in
>> the job run, I see "Low Watermarks" on each node and they increase. After
>> some checkpoint failures, low watermarks stop appearing in the UI
>> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
>> .
>>
>>
>> .assignTimestampsAndWatermarks(
>>
>>
>> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>
>>
>>
>> Thanks Yun!
>>
>>
>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yu...@aliyun.com> wrote:
>>
>>> Hi Dan,
>>>
>>> Have you use a too large upperBound or lowerBound?
>>>
>>> If not, could you also check the watermark strategy ?
>>> The interval join operator depends on the event-time
>>> timer for cleanup, and the event-time timer would be
>>> triggered via watermark.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*Dan Hill <qu...@gmail.com>
>>> *Send Date:*Mon Mar 8 14:59:48 2021
>>> *Recipients:*user <us...@flink.apache.org>
>>> *Subject:*Gradually increasing checkpoint size
>>>
>>>> Hi!
>>>>
>>>> I'm running a backfill Flink stream job over older data. It has
>>>> multiple interval joins. I noticed my checkpoint is regularly gaining in
>>>> size. I'd expect my checkpoints to stabilize and not grow.
>>>>
>>>> Is there a setting to prune useless data from the checkpoint? My top
>>>> guess is that my checkpoint has a bunch of useless state in it.
>>>>
>>>> - Dan
>>>>
>>>
Re: Re: Gradually increasing checkpoint size
Posted by Yun Gao <yu...@aliyun.com>.
Hi Dan,
Regarding the original checkpoint size problem, could you also have a check
which tasks' state are increasing from the checkpoint UI ? For example, the
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?
Best,
Yun
------------------Original Mail ------------------
Sender:Dan Hill <qu...@gmail.com>
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao <yu...@aliyun.com>
CC:user <us...@flink.apache.org>
Subject:Re: Gradually increasing checkpoint size
Hi Yun!
Thanks for the quick reply.
One of the lowerBounds is large but the table being joined with is ~500 rows. I also have my own operator that only outputs the first value.
public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> {
private transient ValueState<Boolean> alreadyOutputted;
@Override
public void flatMap(T value, Collector<T> out) throws Exception {
if (!alreadyOutputted.value()) {
alreadyOutputted.update(true);
out.collect(value);
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Boolean> descriptor =
new ValueStateDescriptor<>(
"alreadyOutputted", // the state name
TypeInformation.of(new TypeHint<Boolean>() {}), // type information
false); // default value of the state, if nothing was set
alreadyOutputted = getRuntimeContext().getState(descriptor);
}
}
All of my inputs have this watermark strategy. In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase. After some checkpoint failures, low watermarks stop appearing in the UI.
.assignTimestampsAndWatermarks(
WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
Thanks Yun!
On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yu...@aliyun.com> wrote:
Hi Dan,
Have you use a too large upperBound or lowerBound?
If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark.
Best,
Yun
------------------Original Mail ------------------
Sender:Dan Hill <qu...@gmail.com>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <us...@flink.apache.org>
Subject:Gradually increasing checkpoint size
Hi!
I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow.
Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it.
- Dan
Re: Gradually increasing checkpoint size
Posted by Dan Hill <qu...@gmail.com>.
Hi Yun!
Thanks for the quick reply.
One of the lowerBounds is large but the table being joined with is ~500
rows. I also have my own operator that only outputs the first value.
public class OnlyFirstUser<T extends GeneratedMessageV3> extends
RichFlatMapFunction<T, T> {
private transient ValueState<Boolean> alreadyOutputted;
@Override
public void flatMap(T value, Collector<T> out) throws Exception {
if (!alreadyOutputted.value()) {
alreadyOutputted.update(true);
out.collect(value);
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Boolean> descriptor =
new ValueStateDescriptor<>(
"alreadyOutputted", // the state name
TypeInformation.of(new TypeHint<Boolean>() {}), //
type information
false); // default value of the state, if nothing
was set
alreadyOutputted = getRuntimeContext().getState(descriptor);
}
}
All of my inputs have this watermark strategy. In the Flink UI, early in
the job run, I see "Low Watermarks" on each node and they increase. After
some checkpoint failures, low watermarks stop appearing in the UI
<https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
.
.assignTimestampsAndWatermarks(
WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
Thanks Yun!
On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yu...@aliyun.com> wrote:
> Hi Dan,
>
> Have you use a too large upperBound or lowerBound?
>
> If not, could you also check the watermark strategy ?
> The interval join operator depends on the event-time
> timer for cleanup, and the event-time timer would be
> triggered via watermark.
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Dan Hill <qu...@gmail.com>
> *Send Date:*Mon Mar 8 14:59:48 2021
> *Recipients:*user <us...@flink.apache.org>
> *Subject:*Gradually increasing checkpoint size
>
>> Hi!
>>
>> I'm running a backfill Flink stream job over older data. It has multiple
>> interval joins. I noticed my checkpoint is regularly gaining in size. I'd
>> expect my checkpoints to stabilize and not grow.
>>
>> Is there a setting to prune useless data from the checkpoint? My top
>> guess is that my checkpoint has a bunch of useless state in it.
>>
>> - Dan
>>
>
Re: Gradually increasing checkpoint size
Posted by Yun Gao <yu...@aliyun.com>.
Hi Dan,
Have you use a too large upperBound or lowerBound?
If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark.
Best,
Yun
------------------Original Mail ------------------
Sender:Dan Hill <qu...@gmail.com>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <us...@flink.apache.org>
Subject:Gradually increasing checkpoint size
Hi!
I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow.
Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it.
- Dan