You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wangwei <ga...@163.com> on 2023/03/06 12:53:18 UTC
flink on K8S(operator) 如何获取 Accumulator
Hi,大佬们
如何在任务结束后获取Accumulator 数据?
参考代码:(但是无法获取)
ableResult execute = statementSet.execute();
Optional<JobClient> jobClient = execute.getJobClient();
jobClient.get().getAccumulators().get()
PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取?
大佬求助!!先磕为敬
Re: Re: Flink异步Hbase导致Too many open files异常
Posted by aiden <18...@163.com>.
Hi:
我在HbaseDimensionAsyncFunc 中重写了close方法,发现并没有生效,是我遗漏了什么吗?
@Override
public void close(){
client.flush();
client.shutdown();
}
18765295908@163.com
发件人: Ran Tao
发送时间: 2023-03-08 19:37
收件人: user-zh
主题: Re: Flink异步Hbase导致Too many open files异常
+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。
Best Regards,
Ran Tao
Weihua Hu <hu...@gmail.com> 于2023年3月8日周三 16:52写道:
> Hi,
>
> 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
>
> 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
>
> Best,
> Weihua
>
>
> On Wed, Mar 8, 2023 at 4:19 PM aiden <18...@163.com> wrote:
>
> > Hi
> > 我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> > 2023-03-08 16:15:39
> > org.jboss.netty.channel.ChannelException: Failed to create a selector.
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> > at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> > at
> org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> > at
> >
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> > at sun.nio.ch
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at
> >
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> > ... 25 more
> > 对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> > java.io.IOException: Could not perform checkpoint 5 for operator async
> > wait operator (2/9)#0.
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> > at org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> > at org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> > not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> > reason: Checkpoint was declined.
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> > ... 22 more
> > Caused by: java.util.ConcurrentModificationException
> > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> > at
> >
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> > ... 33 more
> >
> > 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> > COMMAND PID USER FD TYPE DEVICE SIZE/OFF
> NODE
> > NAME
> > java 168258 yarn *648u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *652r FIFO 0,9 0t0
> 850723636
> > pipe
> > java 168258 yarn *653w FIFO 0,9 0t0
> 850723636
> > pipe
> > java 168258 yarn *654u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *655r FIFO 0,9 0t0
> 850723637
> > pipe
> > java 168258 yarn *656w FIFO 0,9 0t0
> 850723637
> > pipe
> > java 168258 yarn *657u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *658r FIFO 0,9 0t0
> 850723638
> > pipe
> > java 168258 yarn *659w FIFO 0,9 0t0
> 850723638
> > pipe
> > java 168258 yarn *660u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *661w FIFO 0,9 0t0
> 850723639
> > pipe
> >
> > flink版本:1.16.0
> > asynchbase版本:1.8.2
> > 附POM:
> > <dependency>
> > <groupId>org.hbase</groupId>
> > <artifactId>asynchbase</artifactId>
> > <version>1.8.2</version>
> > <exclusions>
> > <exclusion>
> > <groupId>org.slf4j</groupId>
> > <artifactId>*</artifactId>
> > </exclusion>
> > </exclusions>
> > </dependency>
> >
> > 关键代码如下:
> > SingleOutputStreamOperator asyncFunc = AsyncDataStream
> > .orderedWaitWithRetry(source, new
> > HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
> > .setParallelism(9)
> > .uid("asyncFunc");
> >
> > public class HbaseDimensionAsyncFunc extends
> > RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> > ArrayList<HashMap<String, String>>> {
> > HBaseClient client = null;
> >
> > @Override
> > public void open(Configuration configuration) throws Exception {
> > super.open(configuration);
> > log.warn("==========创建hbase客户端==========");
> > client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
> > }
> >
> > @Override
> > public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> > String>>> o, ResultFuture<ArrayList<HashMap<String, String>>>
> resultFuture)
> > throws Exception {
> >
> > client.get(new GetRequest(HBASE_TABLE,
> > uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
> > // 业务代码
> > });
> > }
> > }
> >
> >
>
Re: Flink异步Hbase导致Too many open files异常
Posted by Ran Tao <ch...@gmail.com>.
+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。
Best Regards,
Ran Tao
Weihua Hu <hu...@gmail.com> 于2023年3月8日周三 16:52写道:
> Hi,
>
> 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
>
> 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
>
> Best,
> Weihua
>
>
> On Wed, Mar 8, 2023 at 4:19 PM aiden <18...@163.com> wrote:
>
> > Hi
> > 我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> > 2023-03-08 16:15:39
> > org.jboss.netty.channel.ChannelException: Failed to create a selector.
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> > at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> > at
> org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> > at
> >
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> > at sun.nio.ch
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at
> >
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> > ... 25 more
> > 对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> > java.io.IOException: Could not perform checkpoint 5 for operator async
> > wait operator (2/9)#0.
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> > at org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> > at org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> > not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> > reason: Checkpoint was declined.
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> > ... 22 more
> > Caused by: java.util.ConcurrentModificationException
> > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> > at
> >
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> > ... 33 more
> >
> > 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> > COMMAND PID USER FD TYPE DEVICE SIZE/OFF
> NODE
> > NAME
> > java 168258 yarn *648u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *652r FIFO 0,9 0t0
> 850723636
> > pipe
> > java 168258 yarn *653w FIFO 0,9 0t0
> 850723636
> > pipe
> > java 168258 yarn *654u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *655r FIFO 0,9 0t0
> 850723637
> > pipe
> > java 168258 yarn *656w FIFO 0,9 0t0
> 850723637
> > pipe
> > java 168258 yarn *657u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *658r FIFO 0,9 0t0
> 850723638
> > pipe
> > java 168258 yarn *659w FIFO 0,9 0t0
> 850723638
> > pipe
> > java 168258 yarn *660u a_inode 0,10 0
> 8670
> > [eventpoll]
> > java 168258 yarn *661w FIFO 0,9 0t0
> 850723639
> > pipe
> >
> > flink版本:1.16.0
> > asynchbase版本:1.8.2
> > 附POM:
> > <dependency>
> > <groupId>org.hbase</groupId>
> > <artifactId>asynchbase</artifactId>
> > <version>1.8.2</version>
> > <exclusions>
> > <exclusion>
> > <groupId>org.slf4j</groupId>
> > <artifactId>*</artifactId>
> > </exclusion>
> > </exclusions>
> > </dependency>
> >
> > 关键代码如下:
> > SingleOutputStreamOperator asyncFunc = AsyncDataStream
> > .orderedWaitWithRetry(source, new
> > HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
> > .setParallelism(9)
> > .uid("asyncFunc");
> >
> > public class HbaseDimensionAsyncFunc extends
> > RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> > ArrayList<HashMap<String, String>>> {
> > HBaseClient client = null;
> >
> > @Override
> > public void open(Configuration configuration) throws Exception {
> > super.open(configuration);
> > log.warn("==========创建hbase客户端==========");
> > client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
> > }
> >
> > @Override
> > public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> > String>>> o, ResultFuture<ArrayList<HashMap<String, String>>>
> resultFuture)
> > throws Exception {
> >
> > client.get(new GetRequest(HBASE_TABLE,
> > uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
> > // 业务代码
> > });
> > }
> > }
> >
> >
>
Re: Flink异步Hbase导致Too many open files异常
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
Best,
Weihua
On Wed, Mar 8, 2023 at 4:19 PM aiden <18...@163.com> wrote:
> Hi
> 我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> 2023-03-08 16:15:39
> org.jboss.netty.channel.ChannelException: Failed to create a selector.
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> at
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch
> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> ... 25 more
> 对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> java.io.IOException: Could not perform checkpoint 5 for operator async
> wait operator (2/9)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> ... 22 more
> Caused by: java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> ... 33 more
>
> 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE
> NAME
> java 168258 yarn *648u a_inode 0,10 0 8670
> [eventpoll]
> java 168258 yarn *652r FIFO 0,9 0t0 850723636
> pipe
> java 168258 yarn *653w FIFO 0,9 0t0 850723636
> pipe
> java 168258 yarn *654u a_inode 0,10 0 8670
> [eventpoll]
> java 168258 yarn *655r FIFO 0,9 0t0 850723637
> pipe
> java 168258 yarn *656w FIFO 0,9 0t0 850723637
> pipe
> java 168258 yarn *657u a_inode 0,10 0 8670
> [eventpoll]
> java 168258 yarn *658r FIFO 0,9 0t0 850723638
> pipe
> java 168258 yarn *659w FIFO 0,9 0t0 850723638
> pipe
> java 168258 yarn *660u a_inode 0,10 0 8670
> [eventpoll]
> java 168258 yarn *661w FIFO 0,9 0t0 850723639
> pipe
>
> flink版本:1.16.0
> asynchbase版本:1.8.2
> 附POM:
> <dependency>
> <groupId>org.hbase</groupId>
> <artifactId>asynchbase</artifactId>
> <version>1.8.2</version>
> <exclusions>
> <exclusion>
> <groupId>org.slf4j</groupId>
> <artifactId>*</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
>
> 关键代码如下:
> SingleOutputStreamOperator asyncFunc = AsyncDataStream
> .orderedWaitWithRetry(source, new
> HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
> .setParallelism(9)
> .uid("asyncFunc");
>
> public class HbaseDimensionAsyncFunc extends
> RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> ArrayList<HashMap<String, String>>> {
> HBaseClient client = null;
>
> @Override
> public void open(Configuration configuration) throws Exception {
> super.open(configuration);
> log.warn("==========创建hbase客户端==========");
> client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
> }
>
> @Override
> public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> resultFuture)
> throws Exception {
>
> client.get(new GetRequest(HBASE_TABLE,
> uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
> // 业务代码
> });
> }
> }
>
>
Flink异步Hbase导致Too many open files异常
Posted by aiden <18...@163.com>.
Hi
我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
2023-03-08 16:15:39
org.jboss.netty.channel.ChannelException: Failed to create a selector.
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
at com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
... 25 more
对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
java.io.IOException: Could not perform checkpoint 5 for operator async wait operator (2/9)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 5 for operator async wait operator (2/9)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
... 22 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
... 33 more
对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 168258 yarn *648u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *652r FIFO 0,9 0t0 850723636 pipe
java 168258 yarn *653w FIFO 0,9 0t0 850723636 pipe
java 168258 yarn *654u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *655r FIFO 0,9 0t0 850723637 pipe
java 168258 yarn *656w FIFO 0,9 0t0 850723637 pipe
java 168258 yarn *657u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *658r FIFO 0,9 0t0 850723638 pipe
java 168258 yarn *659w FIFO 0,9 0t0 850723638 pipe
java 168258 yarn *660u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *661w FIFO 0,9 0t0 850723639 pipe
flink版本:1.16.0
asynchbase版本:1.8.2
附POM:
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>1.8.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
关键代码如下:
SingleOutputStreamOperator asyncFunc = AsyncDataStream
.orderedWaitWithRetry(source, new HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
.setParallelism(9)
.uid("asyncFunc");
public class HbaseDimensionAsyncFunc extends RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>, ArrayList<HashMap<String, String>>> {
HBaseClient client = null;
@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
log.warn("==========创建hbase客户端==========");
client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
}
@Override
public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String, String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> resultFuture) throws Exception {
client.get(new GetRequest(HBASE_TABLE, uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
// 业务代码
});
}
}
Re: Re: flink on K8S(operator) 如何获取 Accumulator
Posted by Shammon FY <zj...@gmail.com>.
Hi
像上面提到的,jobClient.get().getAccumulators()会从flink集群获取作业相关信息,如果是application模式,作业结束后flink集群也会退出。你可以通过其他方式,包括session集群运行或者启动history
server等方式,也可以通过自定义metrics等输出到其他系统
Best,
Shammon
On Tue, Mar 7, 2023 at 11:27 PM 李银苗 <ft...@126.com> wrote:
> 退订
Re:Re: flink on K8S(operator) 如何获取 Accumulator
Posted by 李银苗 <ft...@126.com>.
退订
Re: flink on K8S(operator) 如何获取 Accumulator
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
按照你的描述,我猜测你使用的是 Application 模式吧?这种模式下 user code 会在 JobManager 侧执行,Job
执行结束后会直接 shutdown cluster。
可以尝试使用 session mode[1] 部署 cluster
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
Best,
Weihua
On Mon, Mar 6, 2023 at 8:54 PM wangwei <ga...@163.com> wrote:
>
> Hi,大佬们
>
> 如何在任务结束后获取Accumulator 数据?
> 参考代码:(但是无法获取)
> ableResult execute = statementSet.execute();
> Optional<JobClient> jobClient = execute.getJobClient();
> jobClient.get().getAccumulators().get()
>
> PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取?
>
> 大佬求助!!先磕为敬
>