You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/08/03 13:59:02 UTC

[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

    [ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112770#comment-16112770 ] 

ASF GitHub Bot commented on FLINK-5486:
---------------------------------------

Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/4356
  
    ```
    Tests run: 6, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 91.985 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase
    testExternalizedFullRocksDBCheckpointsStandalone(org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase)  Time elapsed: 29.976 sec  <<< ERROR!
    java.io.IOException: java.lang.Exception: Failed to complete checkpoint
    	at org.apache.flink.runtime.testingUtils.TestingCluster.requestCheckpoint(TestingCluster.scala:399)
    	at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedCheckpoints(ExternalizedCheckpointITCase.java:218)
    	at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedFullRocksDBCheckpointsStandalone(ExternalizedCheckpointITCase.java:78)
    Caused by: java.lang.Exception: Failed to complete checkpoint
    	at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:375)
    	at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:358)
    	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
    	at akka.dispatch.OnComplete.internal(Future.scala:247)
    	at akka.dispatch.OnComplete.internal(Future.scala:245)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
    	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    ```


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> --------------------------------------------------------------------
>
>                 Key: FLINK-5486
>                 URL: https://issues.apache.org/jira/browse/FLINK-5486
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Ted Yu
>            Assignee: mingleizhang
>             Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>       handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>       synchronized (bucketState.pendingFilesPerCheckpoint) {
>         bucketState.pendingFilesPerCheckpoint.clear();
>       }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside the synchronization block. Otherwise during the processing of handlePendingFilesForPreviousCheckpoints(), some entries of the map may be cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)