You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vinoyang (JIRA)" <ji...@apache.org> on 2018/11/09 12:33:00 UTC
[jira] [Updated] (FLINK-10840) BucketingSink incorrectly clears the
pendingFiles List
[ https://issues.apache.org/jira/browse/FLINK-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
vinoyang updated FLINK-10840:
-----------------------------
Description:
BucketingSink#snapshotState : (see the *comment* in this method)
{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
//This operation will make this collection prematurely emptied
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
{code}
was:
BucketingSink#snapshotState :
{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
//This operation will make this collection prematurely emptied
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
{code}
> BucketingSink incorrectly clears the pendingFiles List
> ------------------------------------------------------
>
> Key: FLINK-10840
> URL: https://issues.apache.org/jira/browse/FLINK-10840
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Reporter: vinoyang
> Assignee: vinoyang
> Priority: Major
>
> BucketingSink#snapshotState : (see the *comment* in this method)
> {code:java}
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
> Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
> restoredBucketStates.clear();
> synchronized (state.bucketStates) {
> int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
> for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
> BucketState<T> bucketState = bucketStateEntry.getValue();
> if (bucketState.isWriterOpen) {
> bucketState.currentFileValidLength = bucketState.writer.flush();
> }
> synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
> }
> //This operation will make this collection prematurely emptied
> bucketState.pendingFiles = new ArrayList<>();
> }
> restoredBucketStates.add(state);
> if (LOG.isDebugEnabled()) {
> LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
> }
> }
> }
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)