You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Youjun Yuan (Jira)" <ji...@apache.org> on 2021/09/07 09:17:00 UTC
[jira] [Created] (FLINK-24187) Could not commit s3 file after JM
restart during state initialization
Youjun Yuan created FLINK-24187:
-----------------------------------
Summary: Could not commit s3 file after JM restart during state initialization
Key: FLINK-24187
URL: https://issues.apache.org/jira/browse/FLINK-24187
Project: Flink
Issue Type: Bug
Components: FileSystems
Affects Versions: 1.12.1
Reporter: Youjun Yuan
we have a SQL job which consumes from Kafka, and write hive table, data stored in S3.
One day the zookeeper leader failed over, caused Flink job restart. However the job got stuck during state restore, with the following error:
{code:java}
java.io.IOException: Could not commit file from s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64 to s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371 at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] Caused by: java.io.IOException: java.util.concurrent.CancellationException at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more Caused by: java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_242] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_242] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:262) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:249) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:169) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)