You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ori Popowski <or...@gmail.com> on 2020/07/08 12:21:59 UTC

Savepoint fails due to RocksDB 2GiB limit

I've asked this question in https://issues.apache.org/jira/browse/FLINK-9268
but it's been inactive for two years so I'm not sure it will be visible.

While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException. It's happening because some of my
windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.

How can I prevent this?

As I understand it, I need somehow to limit the accumulated size of the
window I'm using, which is EventTimeWindow. However, I have no way of doing
so, because the WindowOperator manages its state on its own.

Below is a full stack trace.

org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
139 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink: Unnamed (23/189).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: null
at org.rocksdb.RocksIterator.value0(Native Method)
at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
at
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
at
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
... 5 common frames omitted

Re: Savepoint fails due to RocksDB 2GiB limit

Posted by Ori Popowski <or...@gmail.com>.
Hi,

Eventually flatMapWithState solved the problem. I started by looking into
KeyedProcessFunction which lead me to flatMapWithState. It's working very
well.

.keyBy(…)
.flatMapWithState[Event, Int] { (event, countOpt) =>
  val count = countOpt.getOrElse(0)
  if (count < config.limit) (List(event), Some(count + 1))
  else (List.empty, Some(count))
}
.keyBy(…)

Using .aggregate(…, new MyProcessFunction) while using an aggregation to
aggregate the events into a list, worked really bad and caused serious
performance issues.

Thanks!

On Sun, Jul 12, 2020 at 10:32 AM Ori Popowski <or...@gmail.com> wrote:

> > AFAIK, current the 2GB limit is still there. as a workaround, maybe you
> can reduce the state size. If this can not be done using the window
> operator, can the keyedprocessfunction[1] be ok for you?
>
> I'll see if I can introduce it to the code.
>
> > if you do, the ProcessWindowFunction is getting as argument an Iterable
> with ALL elements collected along the session. This will make the state per
> key potentially huge (like you're experiencing).
>
> Thanks for noticing that. It's indeed true that we do this. The reason is
> the nature of the computation, which cannot be done incrementally
> unfortunately. It's not a classic avg(), max(), last() etc. computation
> which can be reduced in each step.
> I'm thinking of a way to cap the volume of the state per key using an
> aggregate function that limits the number of elements and returns a list of
> the collected events.
>
> class CappingAggregator(limit: Int) extends AggregateFunction[Event,
> Vector[Event], Vector[Event]] {
>   override def createAccumulator(): Vector[Event] = Vector.empty
>
>   override def add(value: Event, acc: Vector[Event]): Vector[Event] =
>     if (acc.size < limit) acc :+ value
>     else acc
>
>   override def getResult(acc: Vector[Event]): Vector[Event] = Vector(acc:
> _*)
>
>   override def merge(a: Vector[Event], b: Vector[Event]): Vector[Event] =
> (a ++ b).slice(0, limit)
> }
>
> My only problem is with merge(). I'm not sure if b is always later
> elements than a's or if I must sort and only then slice.
>
> On Sat, Jul 11, 2020 at 10:16 PM Rafi Aroch <ra...@gmail.com> wrote:
>
>> Hi Ori,
>>
>> In your code, are you using the process() API?
>>
>> .process(new MyProcessWindowFunction());
>>
>> if you do, the ProcessWindowFunction is getting as argument an Iterable
>> with ALL elements collected along the session. This will make the state per
>> key potentially huge (like you're experiencing).
>>
>> As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
>> API and store in state only an aggregate that is getting incrementally
>> updated on every incoming event (this could be ONE Class / Map / Tuple /
>> etc) rather than keeping ALL elements.
>>
>> See example here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction
>>
>> Thanks,
>> Rafi
>>
>>
>> On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu <qc...@gmail.com>
>> wrote:
>>
>>> Hi Ori
>>>
>>> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
>>> can reduce the state size. If this can not be done using the window
>>> operator, can the keyedprocessfunction[1] be ok for you?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Ori Popowski <or...@gmail.com> 于2020年7月8日周三 下午8:30写道:
>>>
>>>> I've asked this question in
>>>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been
>>>> inactive for two years so I'm not sure it will be visible.
>>>>
>>>> While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
>>>> java.lang.NegativeArraySizeException. It's happening because some of
>>>> my windows have a keyed state of more than 2GiB, hitting RocksDB memory
>>>> limit.
>>>>
>>>> How can I prevent this?
>>>>
>>>> As I understand it, I need somehow to limit the accumulated size of the
>>>> window I'm using, which is EventTimeWindow. However, I have no way of
>>>> doing so, because the WindowOperator manages its state on its own.
>>>>
>>>> Below is a full stack trace.
>>>>
>>>> org.apache.flink.util.SerializedThrowable: Could not materialize
>>>> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000),
>>>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
>>>> Unnamed (23/189).
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>>> java.lang.NegativeArraySizeException
>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
>>>> at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>>>> ... 3 common frames omitted
>>>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>>> at org.rocksdb.RocksIterator.value0(Native Method)
>>>> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
>>>> at
>>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>>>> ... 5 common frames omitted
>>>>
>>>

Re: Savepoint fails due to RocksDB 2GiB limit

Posted by Ori Popowski <or...@gmail.com>.
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
can reduce the state size. If this can not be done using the window
operator, can the keyedprocessfunction[1] be ok for you?

I'll see if I can introduce it to the code.

> if you do, the ProcessWindowFunction is getting as argument an Iterable
with ALL elements collected along the session. This will make the state per
key potentially huge (like you're experiencing).

Thanks for noticing that. It's indeed true that we do this. The reason is
the nature of the computation, which cannot be done incrementally
unfortunately. It's not a classic avg(), max(), last() etc. computation
which can be reduced in each step.
I'm thinking of a way to cap the volume of the state per key using an
aggregate function that limits the number of elements and returns a list of
the collected events.

class CappingAggregator(limit: Int) extends AggregateFunction[Event,
Vector[Event], Vector[Event]] {
  override def createAccumulator(): Vector[Event] = Vector.empty

  override def add(value: Event, acc: Vector[Event]): Vector[Event] =
    if (acc.size < limit) acc :+ value
    else acc

  override def getResult(acc: Vector[Event]): Vector[Event] = Vector(acc:
_*)

  override def merge(a: Vector[Event], b: Vector[Event]): Vector[Event] =
(a ++ b).slice(0, limit)
}

My only problem is with merge(). I'm not sure if b is always later elements
than a's or if I must sort and only then slice.

On Sat, Jul 11, 2020 at 10:16 PM Rafi Aroch <ra...@gmail.com> wrote:

> Hi Ori,
>
> In your code, are you using the process() API?
>
> .process(new MyProcessWindowFunction());
>
> if you do, the ProcessWindowFunction is getting as argument an Iterable
> with ALL elements collected along the session. This will make the state per
> key potentially huge (like you're experiencing).
>
> As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
> API and store in state only an aggregate that is getting incrementally
> updated on every incoming event (this could be ONE Class / Map / Tuple /
> etc) rather than keeping ALL elements.
>
> See example here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction
>
> Thanks,
> Rafi
>
>
> On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu <qc...@gmail.com>
> wrote:
>
>> Hi Ori
>>
>> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
>> can reduce the state size. If this can not be done using the window
>> operator, can the keyedprocessfunction[1] be ok for you?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>
>> Best,
>> Congxian
>>
>>
>> Ori Popowski <or...@gmail.com> 于2020年7月8日周三 下午8:30写道:
>>
>>> I've asked this question in
>>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive
>>> for two years so I'm not sure it will be visible.
>>>
>>> While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
>>> java.lang.NegativeArraySizeException. It's happening because some of my
>>> windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.
>>>
>>> How can I prevent this?
>>>
>>> As I understand it, I need somehow to limit the accumulated size of the
>>> window I'm using, which is EventTimeWindow. However, I have no way of
>>> doing so, because the WindowOperator manages its state on its own.
>>>
>>> Below is a full stack trace.
>>>
>>> org.apache.flink.util.SerializedThrowable: Could not materialize
>>> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000),
>>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
>>> Unnamed (23/189).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>> java.lang.NegativeArraySizeException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>>> ... 3 common frames omitted
>>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>> at org.rocksdb.RocksIterator.value0(Native Method)
>>> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
>>> at
>>> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>>> ... 5 common frames omitted
>>>
>>

Re: Savepoint fails due to RocksDB 2GiB limit

Posted by Rafi Aroch <ra...@gmail.com>.
Hi Ori,

In your code, are you using the process() API?

.process(new MyProcessWindowFunction());

if you do, the ProcessWindowFunction is getting as argument an Iterable
with ALL elements collected along the session. This will make the state per
key potentially huge (like you're experiencing).

As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
API and store in state only an aggregate that is getting incrementally
updated on every incoming event (this could be ONE Class / Map / Tuple /
etc) rather than keeping ALL elements.

See example here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

Thanks,
Rafi


On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu <qc...@gmail.com>
wrote:

> Hi Ori
>
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
> can reduce the state size. If this can not be done using the window
> operator, can the keyedprocessfunction[1] be ok for you?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
> Best,
> Congxian
>
>
> Ori Popowski <or...@gmail.com> 于2020年7月8日周三 下午8:30写道:
>
>> I've asked this question in
>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive
>> for two years so I'm not sure it will be visible.
>>
>> While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
>> java.lang.NegativeArraySizeException. It's happening because some of my
>> windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.
>>
>> How can I prevent this?
>>
>> As I understand it, I need somehow to limit the accumulated size of the
>> window I'm using, which is EventTimeWindow. However, I have no way of
>> doing so, because the WindowOperator manages its state on its own.
>>
>> Below is a full stack trace.
>>
>> org.apache.flink.util.SerializedThrowable: Could not materialize
>> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
>> Unnamed (23/189).
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.SerializedThrowable:
>> java.lang.NegativeArraySizeException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
>> at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>> ... 3 common frames omitted
>> Caused by: org.apache.flink.util.SerializedThrowable: null
>> at org.rocksdb.RocksIterator.value0(Native Method)
>> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>> at
>> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
>> at
>> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>> ... 5 common frames omitted
>>
>

Re: Savepoint fails due to RocksDB 2GiB limit

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Ori

AFAIK, current the 2GB limit is still there. as a workaround, maybe you can
reduce the state size. If this can not be done using the window operator,
can the keyedprocessfunction[1] be ok for you?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction

Best,
Congxian


Ori Popowski <or...@gmail.com> 于2020年7月8日周三 下午8:30写道:

> I've asked this question in
> https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive
> for two years so I'm not sure it will be visible.
>
> While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
> java.lang.NegativeArraySizeException. It's happening because some of my
> windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.
>
> How can I prevent this?
>
> As I understand it, I need somehow to limit the accumulated size of the
> window I'm using, which is EventTimeWindow. However, I have no way of
> doing so, because the WindowOperator manages its state on its own.
>
> Below is a full stack trace.
>
> org.apache.flink.util.SerializedThrowable: Could not materialize
> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
> Unnamed (23/189).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.lang.NegativeArraySizeException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
> ... 3 common frames omitted
> Caused by: org.apache.flink.util.SerializedThrowable: null
> at org.rocksdb.RocksIterator.value0(Native Method)
> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
> at
> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
> at
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
> ... 5 common frames omitted
>