You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "guaishushu1103@163.com" <gu...@163.com> on 2021/03/03 10:52:44 UTC

flink-savepoint问题

checkpoint 可以成功保存,但是savepoint出现错误:
java.lang.Exception: Could not materialize checkpoint 2404 for operator KeyedProcess (21/48).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
    at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
    at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
    at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
    at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
    at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
    at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more


guaishushu1103@163.com

Re: flink-savepoint问题

Posted by Congxian Qiu <qc...@gmail.com>.
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次
map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合
Best,
Congxian


guomuhua <66...@qq.com> 于2021年3月4日周四 下午12:49写道:

> 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
> nobleyd wrote
> > 是不是使用了随机key。
>
> > guaishushu1103@
>
> >  &lt;
>
> > guaishushu1103@
>
> > &gt; 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> > java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> > KeyedProcess (21/48).>     at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
>
> > at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
>
> > at java.lang.Thread.run(Thread.java:745)> Caused by:
> > java.util.concurrent.ExecutionException:>
> > java.lang.IllegalArgumentException: Key group 0 is not in>
> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.>     at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)>     at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)>     at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
>
> > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.
>
> > (OperatorSnapshotFinalizer.java:47)>     at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
>
> > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.>     at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
>
> > at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
>
> > at>
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
>
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)>     at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
>
> > ... 5 more>>>
>
> > guaishushu1103@
>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

Posted by guomuhua <66...@qq.com>.
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
nobleyd wrote
> 是不是使用了随机key。

> guaishushu1103@

>  &lt;

> guaishushu1103@

> &gt; 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> KeyedProcess (21/48).>     at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>    
> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>    
> at>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>    
> at>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>    
> at java.lang.Thread.run(Thread.java:745)> Caused by:
> java.util.concurrent.ExecutionException:>
> java.lang.IllegalArgumentException: Key group 0 is not in>
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.>     at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)>     at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)>     at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>    
> at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

> (OperatorSnapshotFinalizer.java:47)>     at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>    
> ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.>     at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>    
> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>    
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>    
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>    
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>    
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>    
> at>
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>    
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)>     at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>    
> ... 5 more>>> 

> guaishushu1103@

>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

Posted by guomuhua <66...@qq.com>.
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

Posted by yidan zhao <hi...@gmail.com>.
是不是使用了随机key。

guaishushu1103@163.com <gu...@163.com> 于2021年3月3日周三 下午6:53写道:

> checkpoint 可以成功保存,但是savepoint出现错误:
> java.lang.Exception: Could not materialize checkpoint 2404 for operator
> KeyedProcess (21/48).
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>     at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>     ... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
>     at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>     at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>     at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
>     at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
>     at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
>     at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
>     at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>     ... 5 more
>
>
> guaishushu1103@163.com
>