You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "王康 (Jira)" <ji...@apache.org> on 2022/01/06 06:13:00 UTC

[jira] [Updated] (FLINK-25539) Create a connector with the batch tableenvironment of flick to read OSS files. The parallelism is set to 16. When reading, there are sometimes thread errors: null IO stream

     [ https://issues.apache.org/jira/browse/FLINK-25539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

王康 updated FLINK-25539:
-----------------------
    Description: 
English:

Null IO stream: use the Flink version 1.13.2, and use the connector to read OSS files,

When the parallelism of the batchtableenvironment environment is set to 16, there are three machine nodes, and then one of the node servers will sometimes read null IO stream, resulting in job failure

Chinese:Null IO stream:用的flink版本1.13.2,用连接器去读取oss文件,

BatchTableEnvironment环境并行度设为16,三台机器节点,然后其中一台节点服务器有时就会出现读数时报错Null IO stream,导致job失败
{code:java}
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);

fbEnv.setParallelism(16);
//
tableEnv.connect(new FileSystem().path(ossPath))
        .withFormat(new Csv().fieldDelimiter(allTable.getSeparator().charAt(0)))
        .withSchema(schema)
        .createTemporaryTable(ossTableName); {code}
{code:java}
//以下就是flink报错的具体信息
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)   at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)   at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)   at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)   at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)   at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)   at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)   at sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)   at java.lang.reflect.Method.invoke(Method.java:498)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)   at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)   at akka.actor.ActorCell.invoke(ActorCell.scala:561)   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)   at akka.dispatch.Mailbox.run(Mailbox.scala:225)   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)   at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)   at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Null IO stream   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.reopen(AliyunOSSInputStream.java:176)   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.read(AliyunOSSInputStream.java:235)   at java.io.DataInputStream.read(DataInputStream.java:149)   at org.apache.flink.fs.osshadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:96)   at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)   at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)   at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)   at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)   at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)   at java.lang.Thread.run(Thread.java:745)
{code}

  was:
Null IO stream:用的flink版本1.13.2,用连接器去读取oss文件,

BatchTableEnvironment环境并行度设为16,三台机器节点,然后其中一台节点服务器有时就会出现读数时报错Null IO stream,导致job失败
{code:java}
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);

fbEnv.setParallelism(16);
//
tableEnv.connect(new FileSystem().path(ossPath))
        .withFormat(new Csv().fieldDelimiter(allTable.getSeparator().charAt(0)))
        .withSchema(schema)
        .createTemporaryTable(ossTableName); {code}
{code:java}
//以下就是flink报错的具体信息
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)   at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)   at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)   at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)   at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)   at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)   at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)   at sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)   at java.lang.reflect.Method.invoke(Method.java:498)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)   at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)   at akka.actor.ActorCell.invoke(ActorCell.scala:561)   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)   at akka.dispatch.Mailbox.run(Mailbox.scala:225)   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)   at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)   at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Null IO stream   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.reopen(AliyunOSSInputStream.java:176)   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.read(AliyunOSSInputStream.java:235)   at java.io.DataInputStream.read(DataInputStream.java:149)   at org.apache.flink.fs.osshadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:96)   at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)   at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)   at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)   at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)   at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)   at java.lang.Thread.run(Thread.java:745)
{code}

        Summary: Create a connector with the batch tableenvironment of flick to read OSS files. The parallelism is set to 16. When reading, there are sometimes thread errors: null IO stream  (was: 用flink的BatchTableEnvironment创建连接器去读取oss文件,并行度设为16,读数时有时会出现线程报错:Null IO stream)

> Create a connector with the batch tableenvironment of flick to read OSS files. The parallelism is set to 16. When reading, there are sometimes thread errors: null IO stream
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25539
>                 URL: https://issues.apache.org/jira/browse/FLINK-25539
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.13.2
>         Environment: linux:三台机器部署flink-taskmanage,16核内存32g
>            Reporter: 王康
>            Priority: Major
>
> English:
> Null IO stream: use the Flink version 1.13.2, and use the connector to read OSS files,
> When the parallelism of the batchtableenvironment environment is set to 16, there are three machine nodes, and then one of the node servers will sometimes read null IO stream, resulting in job failure
> Chinese:Null IO stream:用的flink版本1.13.2,用连接器去读取oss文件,
> BatchTableEnvironment环境并行度设为16,三台机器节点,然后其中一台节点服务器有时就会出现读数时报错Null IO stream,导致job失败
> {code:java}
> ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);
> fbEnv.setParallelism(16);
> //
> tableEnv.connect(new FileSystem().path(ossPath))
>         .withFormat(new Csv().fieldDelimiter(allTable.getSeparator().charAt(0)))
>         .withSchema(schema)
>         .createTemporaryTable(ossTableName); {code}
> {code:java}
> //以下就是flink报错的具体信息
> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)   at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)   at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)   at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)   at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)   at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)   at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)   at sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)   at java.lang.reflect.Method.invoke(Method.java:498)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)   at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)   at akka.actor.ActorCell.invoke(ActorCell.scala:561)   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)   at akka.dispatch.Mailbox.run(Mailbox.scala:225)   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)   at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)   at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Null IO stream   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.reopen(AliyunOSSInputStream.java:176)   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.read(AliyunOSSInputStream.java:235)   at java.io.DataInputStream.read(DataInputStream.java:149)   at org.apache.flink.fs.osshadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:96)   at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)   at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)   at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)   at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)   at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)