You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Youjun Yuan (Jira)" <ji...@apache.org> on 2021/09/07 09:17:00 UTC

[jira] [Created] (FLINK-24187) Could not commit s3 file after JM restart during state initialization

Youjun Yuan created FLINK-24187:
-----------------------------------

             Summary: Could not commit s3 file after JM restart during state initialization
                 Key: FLINK-24187
                 URL: https://issues.apache.org/jira/browse/FLINK-24187
             Project: Flink
          Issue Type: Bug
          Components: FileSystems
    Affects Versions: 1.12.1
            Reporter: Youjun Yuan


we have a SQL job which consumes from Kafka, and write hive table, data stored in S3.

One day the zookeeper leader failed over, caused Flink job restart. However the job got stuck during state restore, with the following error:
{code:java}
java.io.IOException: Could not commit file from s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64 to s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371 at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] Caused by: java.io.IOException: java.util.concurrent.CancellationException at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more Caused by: java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_242] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_242] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:262) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:249) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:169) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)