You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2019/05/08 05:46:00 UTC

[jira] [Updated] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint

     [ https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger updated FLINK-12379:
-----------------------------------
    Component/s: Runtime / Coordination

> Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
> ------------------------------------------------------------------------
>
>                 Key: FLINK-12379
>                 URL: https://issues.apache.org/jira/browse/FLINK-12379
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems, Runtime / Coordination
>    Affects Versions: 1.8.0
>         Environment: GCS +
>  
> {code:java}
> <flink.version>1.8.0</flink.version>
> <java.version>1.8</java.version>
> <scala.binary.version>2.11</scala.binary.version>{code}
> {code:java}
> <!-- Cloud Storage: -->
> <!-- https://search.maven.org/search?q=g:com.google.cloud.bigdataoss -->
> <!-- https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/pubsub/README.md -->
> <dependency>
>   <groupId>com.google.cloud.bigdataoss</groupId>
>   <artifactId>gcs-connector</artifactId>
>   <version>hadoop2-1.9.16</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-connector-filesystem_2.11</artifactId>
>   <version>${flink.version}</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-hadoop-fs</artifactId>
>   <version>${flink.version}</version>
> </dependency>
> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 -->
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-shaded-hadoop2</artifactId>
>   <version>${hadoop.version}-${flink.version}</version>
> </dependency>
> {code}
>  
>  
>            Reporter: Henrik
>            Priority: Major
>
> When running one standalone-job w/ parallelism=1 + one taskmanager, you will shortly get this crash
> {code:java}
> 2019-04-30 22:20:02,928 WARN  org.apache.flink.runtime.jobmaster.JobMaster                  - Error while processing checkpoint acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 5.
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837)
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
>     at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints/00000000000000000000000000000000/chk-5/_metadata' already exists
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:74)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
>     at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> Caused by: java.nio.file.FileAlreadyExistsException: Object gs://example_bucket/flink/checkpoints/00000000000000000000000000000000/chk-5/_metadata already exists.
>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918)
>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410)
>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286)
>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82)
>     ... 19 more
> 2019-04-30 22:20:03,114 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1556662802928 for job 00000000000000000000000000000000.{code}
> My guess at why; concurrent checkpoint writers are updating the _metadata resource concurrently. They should be using optimistic concurrency control with ETag on GCS, and then retry until successful.
> When running with parallelism=4, you always seem to get this, even after deleting all checkpoints:
> {code:java}
> [analytics-job-cluster-668886c96b-mhqmc job] 2019-04-30 22:50:35,175 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> Process -> Timestamps/Watermarks -> our_events (1/4) of job 00000000000000000000000000000000 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.{code}
> Or in short: with parallelism > 1, your Flink job never makes progress.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)