You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by aiden <18...@163.com> on 2023/03/08 08:19:14 UTC

Flink异步Hbase导致Too many open files异常

Hi
  我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
2023-03-08 16:15:39
org.jboss.netty.channel.ChannelException: Failed to create a selector.
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
at com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
... 25 more
  对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
java.io.IOException: Could not perform checkpoint 5 for operator async wait operator (2/9)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 5 for operator async wait operator (2/9)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
... 22 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
... 33 more

对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
COMMAND    PID USER   FD      TYPE             DEVICE  SIZE/OFF       NODE NAME
java    168258 yarn *648u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *652r     FIFO                0,9       0t0  850723636 pipe
java    168258 yarn *653w     FIFO                0,9       0t0  850723636 pipe
java    168258 yarn *654u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *655r     FIFO                0,9       0t0  850723637 pipe
java    168258 yarn *656w     FIFO                0,9       0t0  850723637 pipe
java    168258 yarn *657u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *658r     FIFO                0,9       0t0  850723638 pipe
java    168258 yarn *659w     FIFO                0,9       0t0  850723638 pipe
java    168258 yarn *660u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *661w     FIFO                0,9       0t0  850723639 pipe

flink版本:1.16.0
asynchbase版本:1.8.2
附POM:
        <dependency>
            <groupId>org.hbase</groupId>
            <artifactId>asynchbase</artifactId>
            <version>1.8.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

关键代码如下:
        SingleOutputStreamOperator asyncFunc = AsyncDataStream
                .orderedWaitWithRetry(source, new HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
                .setParallelism(9)
                .uid("asyncFunc");

public class HbaseDimensionAsyncFunc extends RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>, ArrayList<HashMap<String, String>>> {
    HBaseClient client = null;

    @Override
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        log.warn("==========创建hbase客户端==========");
        client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
    }

    @Override
    public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String, String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> resultFuture) throws Exception {
      
            client.get(new GetRequest(HBASE_TABLE, uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
            // 业务代码
            });
    }
}


Re: Re: Flink异步Hbase导致Too many open files异常

Posted by aiden <18...@163.com>.
Hi:
  我在HbaseDimensionAsyncFunc 中重写了close方法,发现并没有生效,是我遗漏了什么吗?
    @Override
    public void close(){
        client.flush();
        client.shutdown();
    }



18765295908@163.com
 
发件人: Ran Tao
发送时间: 2023-03-08 19:37
收件人: user-zh
主题: Re: Flink异步Hbase导致Too many open files异常
+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。
 
Best Regards,
Ran Tao
 
 
Weihua Hu <hu...@gmail.com> 于2023年3月8日周三 16:52写道:
 
> Hi,
>
> 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
>
> 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
>
> Best,
> Weihua
>
>
> On Wed, Mar 8, 2023 at 4:19 PM aiden <18...@163.com> wrote:
>
> > Hi
> >   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> > 2023-03-08 16:15:39
> > org.jboss.netty.channel.ChannelException: Failed to create a selector.
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> > at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> > at
> org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> > at
> >
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> > at sun.nio.ch
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at
> >
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> > ... 25 more
> >   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> > java.io.IOException: Could not perform checkpoint 5 for operator async
> > wait operator (2/9)#0.
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> > at org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> > at org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> > not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> > reason: Checkpoint was declined.
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> > ... 22 more
> > Caused by: java.util.ConcurrentModificationException
> > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> > at
> >
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> > ... 33 more
> >
> > 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> > COMMAND    PID USER   FD      TYPE             DEVICE  SIZE/OFF
>  NODE
> > NAME
> > java    168258 yarn *648u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *652r     FIFO                0,9       0t0
> 850723636
> > pipe
> > java    168258 yarn *653w     FIFO                0,9       0t0
> 850723636
> > pipe
> > java    168258 yarn *654u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *655r     FIFO                0,9       0t0
> 850723637
> > pipe
> > java    168258 yarn *656w     FIFO                0,9       0t0
> 850723637
> > pipe
> > java    168258 yarn *657u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *658r     FIFO                0,9       0t0
> 850723638
> > pipe
> > java    168258 yarn *659w     FIFO                0,9       0t0
> 850723638
> > pipe
> > java    168258 yarn *660u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *661w     FIFO                0,9       0t0
> 850723639
> > pipe
> >
> > flink版本:1.16.0
> > asynchbase版本:1.8.2
> > 附POM:
> >         <dependency>
> >             <groupId>org.hbase</groupId>
> >             <artifactId>asynchbase</artifactId>
> >             <version>1.8.2</version>
> >             <exclusions>
> >                 <exclusion>
> >                     <groupId>org.slf4j</groupId>
> >                     <artifactId>*</artifactId>
> >                 </exclusion>
> >             </exclusions>
> >         </dependency>
> >
> > 关键代码如下:
> >         SingleOutputStreamOperator asyncFunc = AsyncDataStream
> >                 .orderedWaitWithRetry(source, new
> > HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
> >                 .setParallelism(9)
> >                 .uid("asyncFunc");
> >
> > public class HbaseDimensionAsyncFunc extends
> > RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> > ArrayList<HashMap<String, String>>> {
> >     HBaseClient client = null;
> >
> >     @Override
> >     public void open(Configuration configuration) throws Exception {
> >         super.open(configuration);
> >         log.warn("==========创建hbase客户端==========");
> >         client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
> >     }
> >
> >     @Override
> >     public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> > String>>> o, ResultFuture<ArrayList<HashMap<String, String>>>
> resultFuture)
> > throws Exception {
> >
> >             client.get(new GetRequest(HBASE_TABLE,
> > uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
> >             // 业务代码
> >             });
> >     }
> > }
> >
> >
>

Re: Flink异步Hbase导致Too many open files异常

Posted by Ran Tao <ch...@gmail.com>.
+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。

Best Regards,
Ran Tao


Weihua Hu <hu...@gmail.com> 于2023年3月8日周三 16:52写道:

> Hi,
>
> 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
>
> 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
>
> Best,
> Weihua
>
>
> On Wed, Mar 8, 2023 at 4:19 PM aiden <18...@163.com> wrote:
>
> > Hi
> >   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> > 2023-03-08 16:15:39
> > org.jboss.netty.channel.ChannelException: Failed to create a selector.
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> > at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> > at
> org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> > at
> >
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> > at sun.nio.ch
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at
> >
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> > ... 25 more
> >   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> > java.io.IOException: Could not perform checkpoint 5 for operator async
> > wait operator (2/9)#0.
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> > at org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> > at org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> > not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> > reason: Checkpoint was declined.
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> > ... 22 more
> > Caused by: java.util.ConcurrentModificationException
> > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> > at
> >
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> > ... 33 more
> >
> > 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> > COMMAND    PID USER   FD      TYPE             DEVICE  SIZE/OFF
>  NODE
> > NAME
> > java    168258 yarn *648u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *652r     FIFO                0,9       0t0
> 850723636
> > pipe
> > java    168258 yarn *653w     FIFO                0,9       0t0
> 850723636
> > pipe
> > java    168258 yarn *654u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *655r     FIFO                0,9       0t0
> 850723637
> > pipe
> > java    168258 yarn *656w     FIFO                0,9       0t0
> 850723637
> > pipe
> > java    168258 yarn *657u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *658r     FIFO                0,9       0t0
> 850723638
> > pipe
> > java    168258 yarn *659w     FIFO                0,9       0t0
> 850723638
> > pipe
> > java    168258 yarn *660u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *661w     FIFO                0,9       0t0
> 850723639
> > pipe
> >
> > flink版本:1.16.0
> > asynchbase版本:1.8.2
> > 附POM:
> >         <dependency>
> >             <groupId>org.hbase</groupId>
> >             <artifactId>asynchbase</artifactId>
> >             <version>1.8.2</version>
> >             <exclusions>
> >                 <exclusion>
> >                     <groupId>org.slf4j</groupId>
> >                     <artifactId>*</artifactId>
> >                 </exclusion>
> >             </exclusions>
> >         </dependency>
> >
> > 关键代码如下:
> >         SingleOutputStreamOperator asyncFunc = AsyncDataStream
> >                 .orderedWaitWithRetry(source, new
> > HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
> >                 .setParallelism(9)
> >                 .uid("asyncFunc");
> >
> > public class HbaseDimensionAsyncFunc extends
> > RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> > ArrayList<HashMap<String, String>>> {
> >     HBaseClient client = null;
> >
> >     @Override
> >     public void open(Configuration configuration) throws Exception {
> >         super.open(configuration);
> >         log.warn("==========创建hbase客户端==========");
> >         client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
> >     }
> >
> >     @Override
> >     public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> > String>>> o, ResultFuture<ArrayList<HashMap<String, String>>>
> resultFuture)
> > throws Exception {
> >
> >             client.get(new GetRequest(HBASE_TABLE,
> > uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
> >             // 业务代码
> >             });
> >     }
> > }
> >
> >
>

Re: Flink异步Hbase导致Too many open files异常

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。

在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。

Best,
Weihua


On Wed, Mar 8, 2023 at 4:19 PM aiden <18...@163.com> wrote:

> Hi
>   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> 2023-03-08 16:15:39
> org.jboss.netty.channel.ChannelException: Failed to create a selector.
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> at
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch
> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> ... 25 more
>   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> java.io.IOException: Could not perform checkpoint 5 for operator async
> wait operator (2/9)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> ... 22 more
> Caused by: java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> ... 33 more
>
> 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> COMMAND    PID USER   FD      TYPE             DEVICE  SIZE/OFF       NODE
> NAME
> java    168258 yarn *648u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *652r     FIFO                0,9       0t0  850723636
> pipe
> java    168258 yarn *653w     FIFO                0,9       0t0  850723636
> pipe
> java    168258 yarn *654u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *655r     FIFO                0,9       0t0  850723637
> pipe
> java    168258 yarn *656w     FIFO                0,9       0t0  850723637
> pipe
> java    168258 yarn *657u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *658r     FIFO                0,9       0t0  850723638
> pipe
> java    168258 yarn *659w     FIFO                0,9       0t0  850723638
> pipe
> java    168258 yarn *660u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *661w     FIFO                0,9       0t0  850723639
> pipe
>
> flink版本:1.16.0
> asynchbase版本:1.8.2
> 附POM:
>         <dependency>
>             <groupId>org.hbase</groupId>
>             <artifactId>asynchbase</artifactId>
>             <version>1.8.2</version>
>             <exclusions>
>                 <exclusion>
>                     <groupId>org.slf4j</groupId>
>                     <artifactId>*</artifactId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>
> 关键代码如下:
>         SingleOutputStreamOperator asyncFunc = AsyncDataStream
>                 .orderedWaitWithRetry(source, new
> HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
>                 .setParallelism(9)
>                 .uid("asyncFunc");
>
> public class HbaseDimensionAsyncFunc extends
> RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> ArrayList<HashMap<String, String>>> {
>     HBaseClient client = null;
>
>     @Override
>     public void open(Configuration configuration) throws Exception {
>         super.open(configuration);
>         log.warn("==========创建hbase客户端==========");
>         client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
>     }
>
>     @Override
>     public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> resultFuture)
> throws Exception {
>
>             client.get(new GetRequest(HBASE_TABLE,
> uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
>             // 业务代码
>             });
>     }
> }
>
>