You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vinoyang (JIRA)" <ji...@apache.org> on 2018/11/09 12:33:00 UTC

[jira] [Updated] (FLINK-10840) BucketingSink incorrectly clears the pendingFiles List

     [ https://issues.apache.org/jira/browse/FLINK-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

vinoyang updated FLINK-10840:
-----------------------------
    Description: 
BucketingSink#snapshotState : (see the *comment* in this method)
{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
   Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

   restoredBucketStates.clear();

   synchronized (state.bucketStates) {
      int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

      for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
         BucketState<T> bucketState = bucketStateEntry.getValue();

         if (bucketState.isWriterOpen) {
            bucketState.currentFileValidLength = bucketState.writer.flush();
         }

         synchronized (bucketState.pendingFilesPerCheckpoint) {
            bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
         }
         //This operation will make this collection prematurely emptied
         bucketState.pendingFiles = new ArrayList<>();
      }
      restoredBucketStates.add(state);

      if (LOG.isDebugEnabled()) {
         LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
      }
   }
}
{code}
 

 

  was:
BucketingSink#snapshotState : 
{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
   Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

   restoredBucketStates.clear();

   synchronized (state.bucketStates) {
      int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

      for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
         BucketState<T> bucketState = bucketStateEntry.getValue();

         if (bucketState.isWriterOpen) {
            bucketState.currentFileValidLength = bucketState.writer.flush();
         }

         synchronized (bucketState.pendingFilesPerCheckpoint) {
            bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
         }
         //This operation will make this collection prematurely emptied
         bucketState.pendingFiles = new ArrayList<>();
      }
      restoredBucketStates.add(state);

      if (LOG.isDebugEnabled()) {
         LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
      }
   }
}
{code}
 

 


> BucketingSink incorrectly clears the pendingFiles List
> ------------------------------------------------------
>
>                 Key: FLINK-10840
>                 URL: https://issues.apache.org/jira/browse/FLINK-10840
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: vinoyang
>            Assignee: vinoyang
>            Priority: Major
>
> BucketingSink#snapshotState : (see the *comment* in this method)
> {code:java}
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
>    restoredBucketStates.clear();
>    synchronized (state.bucketStates) {
>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
>          BucketState<T> bucketState = bucketStateEntry.getValue();
>          if (bucketState.isWriterOpen) {
>             bucketState.currentFileValidLength = bucketState.writer.flush();
>          }
>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>             bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
>          }
>          //This operation will make this collection prematurely emptied
>          bucketState.pendingFiles = new ArrayList<>();
>       }
>       restoredBucketStates.add(state);
>       if (LOG.isDebugEnabled()) {
>          LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
>       }
>    }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)