You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/08/03 13:59:02 UTC
[jira] [Commented] (FLINK-5486) Lack of synchronization in
BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112770#comment-16112770 ]
ASF GitHub Bot commented on FLINK-5486:
---------------------------------------
Github user zhangminglei commented on the issue:
https://github.com/apache/flink/pull/4356
```
Tests run: 6, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 91.985 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase
testExternalizedFullRocksDBCheckpointsStandalone(org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase) Time elapsed: 29.976 sec <<< ERROR!
java.io.IOException: java.lang.Exception: Failed to complete checkpoint
at org.apache.flink.runtime.testingUtils.TestingCluster.requestCheckpoint(TestingCluster.scala:399)
at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedCheckpoints(ExternalizedCheckpointITCase.java:218)
at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedFullRocksDBCheckpointsStandalone(ExternalizedCheckpointITCase.java:78)
Caused by: java.lang.Exception: Failed to complete checkpoint
at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:375)
at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:358)
at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
at akka.dispatch.OnComplete.internal(Future.scala:247)
at akka.dispatch.OnComplete.internal(Future.scala:245)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
```
> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> --------------------------------------------------------------------
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Reporter: Ted Yu
> Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
> synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
> }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside the synchronization block. Otherwise during the processing of handlePendingFilesForPreviousCheckpoints(), some entries of the map may be cleared.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)