You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Haoze Wu (Jira)" <ji...@apache.org> on 2023/04/06 18:42:00 UTC

[jira] [Updated] (FLINK-31746) Batch workload output completes while the job client fails

     [ https://issues.apache.org/jira/browse/FLINK-31746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Haoze Wu updated FLINK-31746:
-----------------------------
    Description: 
We are doing testing on Flink-1.14.0 (We know 1.14.0 is not supported now so we are also testing Flink-1.17.0 to see if it has the same issue). We run a batch processing job. The input of the job is a file in the disk; the output of the job is a Kafka topic, which should receive 170 messages when the workload finishes. In the testing, we introduce a fault (an IOException) in a taskmanager, then the batch processing job client fails:
{code:java}
2023-03-26T19:05:48,922 ERROR cli.CliFrontend (CliFrontend.java:handleError(923)) - Error while running the command.org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 85c9bd56d6dd111f858b4b5a99551c53) {code}
The IOException occurs in `BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader` when running `FileChannel.open`. It has multiple chances to occur in a workload.
{code:java}
    FileRegionReader(Path filePath) throws IOException {
        this.fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);
        this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
    }
 {code}
The call stack of this fault site:
{code:java}
(org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader,<init>,200), (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader,<init>,74), (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition,createReadView,221), (org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition,createSubpartitionView,205), (org.apache.flink.runtime.io.network.partition.ResultPartitionManager,createSubpartitionView,76), (org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel,requestSubpartition,133), (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,internalRequestPartitions,330), (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,requestPartitions,299), (org.apache.flink.runtime.taskmanager.InputGateWithMetrics,requestPartitions,127), (org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1,runThrowing,50),
(org.apache.flink.streaming.runtime.tasks.mailbox.Mail,run,90), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMailsNonBlocking,353), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMail,319), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,runMailboxLoop,201), (org.apache.flink.streaming.runtime.tasks.StreamTask,runMailboxLoop,809),
(org.apache.flink.streaming.runtime.tasks.StreamTask,invoke,761),
(org.apache.flink.runtime.taskmanager.Task,runWithSystemExitMonitoring,958),
(org.apache.flink.runtime.taskmanager.Task,restoreAndInvoke,937),
(org.apache.flink.runtime.taskmanager.Task,doRun,766),
(org.apache.flink.runtime.taskmanager.Task,run,575),
(java.lang.Thread,run,748) {code}
We inspect the name of the threads where the fault occurs, we find that our workload can be divided into these tasks:

Split Reader: Custom File Source -> Flat Map (1/8)#0
...
Split Reader: Custom File Source -> Flat Map (8/8)#0
Keyed Aggregation -> Map -> Sink Unnamed Writer (1/8)#0
...
Keyed Aggregation -> Map -> Sink Unnamed Writer (8/8)#0
Sink Unnamed Committer (1/1)#0

 

Such fault during “Split Reader” or “Keyed Aggregation” will trigger this “Job failed” message and our Kafka topic can’t receive the complete correct output (i.e., less than 170 messages). However, if the exception happens during “Sink Unnamed Committer”, the client still recognizes the “Job failed”, while our Kafka topic already completely got what it wants.

We assume that our workload is translated into a few steps: “Custom File Source -> Flat Map”, “Keyed Aggregation -> Map -> Sink Unnamed Writer”, and “Sink Unnamed Committer”. The last one is responsible for some “commit” for it does not affect our end-to-end results. However, the fault in the “commit” stage still reports a “failure” to the job client, while the job client may get confused.

We have some questions about the design rationales:
 # In some workloads such as our case, the “commit” at last seems not to matter that much. Can it be seen as tolerable?
 # The client log is confusing. It shows tons of exceptions but it does not show in which stage of the workload the failure happens. The most useful information for the client is something like “Sink Unnamed Committer (1/1)#0 (7b19f0a2f247b8f38fe9141c9872ef58) switched from RUNNING to FAILED”, which is not shown.

P.S. The complete failure log of the job client is:
{code:java}
2023-04-03T11:36:25,464 ERROR cli.CliFrontend (CliFrontend.java:handleError(923)) - Error while running the comm
and.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.cl
ient.program.ProgramInvocationException: Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist
_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.jav
a:222) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.0.jar
:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) ~[flink-dist_2.11-1.14.0
.jar:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) ~[flink-dist_2.11-1.14.0.jar:1.14.0
]
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) ~[flink-dist_2.11-1.14.0.j
ar:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) ~[flink-dist_2.11-1.14.
0.jar:1.14.0]
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28
) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) [flink-dist_2.11-1.14.0.jar:1.14.
0]
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
        at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironme
nt.java:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[
flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironm
ent.java:1917) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayBatchClientMain.run(FlinkGrayBatchClientMain.java:69) ~[?:?]
        at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.run(FlinkGrayClientMain.java:66) ~[?:?]
        at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.main(FlinkGrayClientMain.java:92) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_221]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_221]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_221]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[?:?]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[?:?]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_221]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_221]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_221]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_221]
Caused by: java.io.IOException
        at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:229) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:82) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:226) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.createSubpartitionView(BufferWritingResultPartition.java:209) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:76) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:133) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:330) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:299) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:127) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:358) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:322) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221] {code}
We feel that probably the job client should improve its logging, by adding more details about the failure, such as the information about “Sink Unnamed Committer”.

We are also checking Flink-1.17.0 to see if it has this issue.

  was:
We are doing testing on Flink-1.14.0 (We know 1.14.0 is not supported now so we are also testing Flink-1.17.0 to see if it has the same issue). We run a batch processing job. The input of the job is a file in the disk; the output of the job is a Kafka topic, which should receive 170 messages when the workload finishes. In the testing, we introduce a fault (an IOException) in a taskmanager, then the batch processing job client fails:

 
{code:java}
2023-03-26T19:05:48,922 ERROR cli.CliFrontend (CliFrontend.java:handleError(923)) - Error while running the command.org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 85c9bd56d6dd111f858b4b5a99551c53) {code}
The IOException occurs in `BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader` when running `FileChannel.open`. It has multiple chances to occur in a workload.

 

 
{code:java}
    FileRegionReader(Path filePath) throws IOException {
        this.fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);
        this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
    }
 {code}
The call stack of this fault site:
{code:java}
(org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader,<init>,200), (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader,<init>,74), (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition,createReadView,221), (org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition,createSubpartitionView,205), (org.apache.flink.runtime.io.network.partition.ResultPartitionManager,createSubpartitionView,76), (org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel,requestSubpartition,133), (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,internalRequestPartitions,330), (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,requestPartitions,299), (org.apache.flink.runtime.taskmanager.InputGateWithMetrics,requestPartitions,127), (org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1,runThrowing,50),
(org.apache.flink.streaming.runtime.tasks.mailbox.Mail,run,90), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMailsNonBlocking,353), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMail,319), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,runMailboxLoop,201), (org.apache.flink.streaming.runtime.tasks.StreamTask,runMailboxLoop,809),
(org.apache.flink.streaming.runtime.tasks.StreamTask,invoke,761),
(org.apache.flink.runtime.taskmanager.Task,runWithSystemExitMonitoring,958),
(org.apache.flink.runtime.taskmanager.Task,restoreAndInvoke,937),
(org.apache.flink.runtime.taskmanager.Task,doRun,766),
(org.apache.flink.runtime.taskmanager.Task,run,575),
(java.lang.Thread,run,748) {code}
 

We inspect the name of the threads where the fault occurs, we find that our workload can be divided into these tasks:

Split Reader: Custom File Source -> Flat Map (1/8)#0
...
Split Reader: Custom File Source -> Flat Map (8/8)#0
Keyed Aggregation -> Map -> Sink Unnamed Writer (1/8)#0
...
Keyed Aggregation -> Map -> Sink Unnamed Writer (8/8)#0
Sink Unnamed Committer (1/1)#0

Such fault during “Split Reader” or “Keyed Aggregation” will trigger this “Job failed” message and our Kafka topic can’t receive the complete correct output (i.e., less than 170 messages). However, if the exception happens during “Sink Unnamed Committer”, the client still recognizes the “Job failed”, while our Kafka topic already completely got what it wants.

We assume that our workload is translated into a few steps: “Custom File Source -> Flat Map”, “Keyed Aggregation -> Map -> Sink Unnamed Writer”, and “Sink Unnamed Committer”. The last one is responsible for some “commit” for it does not affect our end-to-end results. However, the fault in the “commit” stage still reports a “failure” to the job client, while the job client may get confused.

We have some questions about the design rationales:
 # In some workloads such as our case, the “commit” at last seems not to matter that much. Can it be seen as tolerable?
 # The client log is confusing. It shows tons of exceptions but it does not show in which stage of the workload the failure happens. The most useful information for the client is something like “Sink Unnamed Committer (1/1)#0 (7b19f0a2f247b8f38fe9141c9872ef58) switched from RUNNING to FAILED”, which is not shown.

P.S. The complete failure log of the job client is:
{code:java}
2023-04-03T11:36:25,464 ERROR cli.CliFrontend (CliFrontend.java:handleError(923)) - Error while running the comm
and.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.cl
ient.program.ProgramInvocationException: Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist
_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.jav
a:222) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.0.jar
:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) ~[flink-dist_2.11-1.14.0
.jar:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) ~[flink-dist_2.11-1.14.0.jar:1.14.0
]
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) ~[flink-dist_2.11-1.14.0.j
ar:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) ~[flink-dist_2.11-1.14.
0.jar:1.14.0]
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28
) [flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) [flink-dist_2.11-1.14.0.jar:1.14.
0]
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
        at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironme
nt.java:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[
flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironm
ent.java:1917) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayBatchClientMain.run(FlinkGrayBatchClientMain.java:69) ~[?:?]
        at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.run(FlinkGrayClientMain.java:66) ~[?:?]
        at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.main(FlinkGrayClientMain.java:92) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_221]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_221]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_221]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[?:?]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[?:?]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_221]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_221]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_221]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_221]
Caused by: java.io.IOException
        at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:229) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:82) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:226) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.createSubpartitionView(BufferWritingResultPartition.java:209) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:76) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:133) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:330) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:299) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:127) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:358) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:322) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221] {code}
We feel that probably the job client should improve its logging, by adding more details about the failure, such as the information about “Sink Unnamed Committer”.

We are also checking Flink-1.17.0 to see if it has this issue.

 

 


> Batch workload output completes while the job client fails
> ----------------------------------------------------------
>
>                 Key: FLINK-31746
>                 URL: https://issues.apache.org/jira/browse/FLINK-31746
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.14.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> We are doing testing on Flink-1.14.0 (We know 1.14.0 is not supported now so we are also testing Flink-1.17.0 to see if it has the same issue). We run a batch processing job. The input of the job is a file in the disk; the output of the job is a Kafka topic, which should receive 170 messages when the workload finishes. In the testing, we introduce a fault (an IOException) in a taskmanager, then the batch processing job client fails:
> {code:java}
> 2023-03-26T19:05:48,922 ERROR cli.CliFrontend (CliFrontend.java:handleError(923)) - Error while running the command.org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 85c9bd56d6dd111f858b4b5a99551c53) {code}
> The IOException occurs in `BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader` when running `FileChannel.open`. It has multiple chances to occur in a workload.
> {code:java}
>     FileRegionReader(Path filePath) throws IOException {
>         this.fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);
>         this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
>     }
>  {code}
> The call stack of this fault site:
> {code:java}
> (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader,<init>,200), (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader,<init>,74), (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition,createReadView,221), (org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition,createSubpartitionView,205), (org.apache.flink.runtime.io.network.partition.ResultPartitionManager,createSubpartitionView,76), (org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel,requestSubpartition,133), (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,internalRequestPartitions,330), (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,requestPartitions,299), (org.apache.flink.runtime.taskmanager.InputGateWithMetrics,requestPartitions,127), (org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1,runThrowing,50),
> (org.apache.flink.streaming.runtime.tasks.mailbox.Mail,run,90), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMailsNonBlocking,353), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMail,319), (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,runMailboxLoop,201), (org.apache.flink.streaming.runtime.tasks.StreamTask,runMailboxLoop,809),
> (org.apache.flink.streaming.runtime.tasks.StreamTask,invoke,761),
> (org.apache.flink.runtime.taskmanager.Task,runWithSystemExitMonitoring,958),
> (org.apache.flink.runtime.taskmanager.Task,restoreAndInvoke,937),
> (org.apache.flink.runtime.taskmanager.Task,doRun,766),
> (org.apache.flink.runtime.taskmanager.Task,run,575),
> (java.lang.Thread,run,748) {code}
> We inspect the name of the threads where the fault occurs, we find that our workload can be divided into these tasks:
> Split Reader: Custom File Source -> Flat Map (1/8)#0
> ...
> Split Reader: Custom File Source -> Flat Map (8/8)#0
> Keyed Aggregation -> Map -> Sink Unnamed Writer (1/8)#0
> ...
> Keyed Aggregation -> Map -> Sink Unnamed Writer (8/8)#0
> Sink Unnamed Committer (1/1)#0
>  
> Such fault during “Split Reader” or “Keyed Aggregation” will trigger this “Job failed” message and our Kafka topic can’t receive the complete correct output (i.e., less than 170 messages). However, if the exception happens during “Sink Unnamed Committer”, the client still recognizes the “Job failed”, while our Kafka topic already completely got what it wants.
> We assume that our workload is translated into a few steps: “Custom File Source -> Flat Map”, “Keyed Aggregation -> Map -> Sink Unnamed Writer”, and “Sink Unnamed Committer”. The last one is responsible for some “commit” for it does not affect our end-to-end results. However, the fault in the “commit” stage still reports a “failure” to the job client, while the job client may get confused.
> We have some questions about the design rationales:
>  # In some workloads such as our case, the “commit” at last seems not to matter that much. Can it be seen as tolerable?
>  # The client log is confusing. It shows tons of exceptions but it does not show in which stage of the workload the failure happens. The most useful information for the client is something like “Sink Unnamed Committer (1/1)#0 (7b19f0a2f247b8f38fe9141c9872ef58) switched from RUNNING to FAILED”, which is not shown.
> P.S. The complete failure log of the job client is:
> {code:java}
> 2023-04-03T11:36:25,464 ERROR cli.CliFrontend (CliFrontend.java:handleError(923)) - Error while running the comm
> and.
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.cl
> ient.program.ProgramInvocationException: Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist
> _2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.jav
> a:222) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.0.jar
> :1.14.0]
>         at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) ~[flink-dist_2.11-1.14.0
> .jar:1.14.0]
>         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) ~[flink-dist_2.11-1.14.0.jar:1.14.0
> ]
>         at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) ~[flink-dist_2.11-1.14.0.j
> ar:1.14.0]
>         at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) ~[flink-dist_2.11-1.14.
> 0.jar:1.14.0]
>         at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28
> ) [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) [flink-dist_2.11-1.14.0.jar:1.14.
> 0]
> Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException:
> Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
>         at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
>         at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironme
> nt.java:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[
> flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironm
> ent.java:1917) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayBatchClientMain.run(FlinkGrayBatchClientMain.java:69) ~[?:?]
>         at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.run(FlinkGrayClientMain.java:66) ~[?:?]
>         at edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.main(FlinkGrayClientMain.java:92) ~[?:?]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_221]
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         ... 8 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
>         at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
>         at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
>         at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
>         at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_221]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_221]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_221]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
>         at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
>         at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_221]
>         at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) ~[?:1.8.0_221]
>         at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_221]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_221]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_221]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>         at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[?:?]
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[?:?]
>         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?]
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[?:?]
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[?:?]
>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[?:?]
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[?:?]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
>         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
>         at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?]
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
>         at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
>         at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
>         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_221]
>         at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_221]
>         at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_221]
>         at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_221]
> Caused by: java.io.IOException
>         at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:229) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:82) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:226) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.createSubpartitionView(BufferWritingResultPartition.java:209) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:76) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:133) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:330) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:299) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:127) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:358) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:322) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221] {code}
> We feel that probably the job client should improve its logging, by adding more details about the failure, such as the information about “Sink Unnamed Committer”.
> We are also checking Flink-1.17.0 to see if it has this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)