You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vinoyang (JIRA)" <ji...@apache.org> on 2018/11/09 12:36:00 UTC

[jira] [Commented] (FLINK-10840) BucketingSink incorrectly clears the pendingFiles List

    [ https://issues.apache.org/jira/browse/FLINK-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681390#comment-16681390 ] 

vinoyang commented on FLINK-10840:
----------------------------------

[~till.rohrmann] and [~Zentol] I didn't actually encounter this bug, but from my observations, the following line of code in the comment will make the current checkpoint's pending files List empty, right? Because they are the same reference.

> BucketingSink incorrectly clears the pendingFiles List
> ------------------------------------------------------
>
>                 Key: FLINK-10840
>                 URL: https://issues.apache.org/jira/browse/FLINK-10840
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: vinoyang
>            Assignee: vinoyang
>            Priority: Major
>
> BucketingSink#snapshotState : (see the *comment* in this method)
> {code:java}
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
>    restoredBucketStates.clear();
>    synchronized (state.bucketStates) {
>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
>          BucketState<T> bucketState = bucketStateEntry.getValue();
>          if (bucketState.isWriterOpen) {
>             bucketState.currentFileValidLength = bucketState.writer.flush();
>          }
>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>             bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
>          }
>          //This operation will make this collection prematurely emptied
>          bucketState.pendingFiles = new ArrayList<>();
>       }
>       restoredBucketStates.add(state);
>       if (LOG.isDebugEnabled()) {
>          LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
>       }
>    }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)