You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vinoyang (JIRA)" <ji...@apache.org> on 2018/11/09 12:33:00 UTC
[jira] [Created] (FLINK-10840) BucketingSink incorrectly clears the
pendingFiles List
vinoyang created FLINK-10840:
--------------------------------
Summary: BucketingSink incorrectly clears the pendingFiles List
Key: FLINK-10840
URL: https://issues.apache.org/jira/browse/FLINK-10840
Project: Flink
Issue Type: Bug
Components: Streaming Connectors
Reporter: vinoyang
Assignee: vinoyang
BucketingSink#snapshotState :
{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
//This operation will make this collection prematurely emptied
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)