You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/13 09:13:00 UTC

[flink-table-store] 01/02: [FLINK-30210] Refactor StoreWriteOperator so periodic full compaction writer can be reused in compactor sink

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 2073bd66a4e41ac47a98b0c0370b0ccdeb516c62
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Nov 23 15:05:17 2022 +0800

    [FLINK-30210] Refactor StoreWriteOperator so periodic full compaction writer can be reused in compactor sink
---
 ...rator.java => FullChangelogStoreSinkWrite.java} | 108 +++++++++----------
 .../table/store/connector/sink/StoreSink.java      |  27 +++--
 .../table/store/connector/sink/StoreSinkWrite.java |  55 ++++++++++
 .../store/connector/sink/StoreSinkWriteImpl.java   | 116 +++++++++++++++++++++
 .../store/connector/sink/StoreWriteOperator.java   |  80 ++++----------
 5 files changed, 261 insertions(+), 125 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
similarity index 84%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 59f392cf..58827b4f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
@@ -36,11 +36,8 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -52,40 +49,36 @@ import java.util.Set;
 import java.util.TreeMap;
 
 /**
- * A {@link StoreWriteOperator} for {@link
- * org.apache.flink.table.store.CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog producer.
+ * {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
+ * producer. This writer will perform full compaction once in a while.
  */
-public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
-
-    private static final long serialVersionUID = 1L;
+public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
 
     private final long fullCompactionThresholdMs;
 
-    private transient Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
-    private transient NavigableMap<Long, Set<Tuple2<BinaryRowData, Integer>>> writtenBuckets;
-    private transient ListState<Tuple3<Long, BinaryRowData, Integer>> writtenBucketState;
+    private final Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
+    private final NavigableMap<Long, Set<Tuple2<BinaryRowData, Integer>>> writtenBuckets;
+    private final ListState<Tuple3<Long, BinaryRowData, Integer>> writtenBucketState;
 
-    private transient Long currentFirstWriteMs;
-    private transient NavigableMap<Long, Long> firstWriteMs;
-    private transient ListState<Tuple2<Long, Long>> firstWriteMsState;
+    private Long currentFirstWriteMs;
+    private final NavigableMap<Long, Long> firstWriteMs;
+    private final ListState<Tuple2<Long, Long>> firstWriteMsState;
 
-    private transient Long snapshotIdentifierToCheck;
+    private Long snapshotIdentifierToCheck;
 
-    public FullChangelogStoreWriteOperator(
+    public FullChangelogStoreSinkWrite(
             FileStoreTable table,
+            StateInitializationContext context,
             String initialCommitUser,
-            @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkFunction logSinkFunction,
-            long fullCompactionThresholdMs) {
-        super(table, initialCommitUser, overwritePartition, logSinkFunction);
-        this.fullCompactionThresholdMs = fullCompactionThresholdMs;
-    }
+            IOManager ioManager,
+            boolean isOverwrite,
+            long fullCompactionThresholdMs)
+            throws Exception {
+        super(table, context, initialCommitUser, ioManager, isOverwrite);
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void initializeState(StateInitializationContext context) throws Exception {
-        super.initializeState(context);
+        this.fullCompactionThresholdMs = fullCompactionThresholdMs;
 
+        currentWrittenBuckets = new HashSet<>();
         TupleSerializer<Tuple3<Long, BinaryRowData, Integer>> writtenBucketStateSerializer =
                 new TupleSerializer<>(
                         (Class<Tuple3<Long, BinaryRowData, Integer>>) (Class<?>) Tuple3.class,
@@ -110,6 +103,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
                                         .computeIfAbsent(t.f0, k -> new HashSet<>())
                                         .add(Tuple2.of(t.f1, t.f2)));
 
+        currentFirstWriteMs = null;
         TupleSerializer<Tuple2<Long, Long>> firstWriteMsStateSerializer =
                 new TupleSerializer<>(
                         (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
@@ -126,18 +120,20 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
     }
 
     @Override
-    public void open() throws Exception {
-        super.open();
-        currentWrittenBuckets = new HashSet<>();
-        currentFirstWriteMs = null;
+    public SinkRecord write(RowData rowData) throws Exception {
+        SinkRecord sinkRecord = super.write(rowData);
+        touchBucket(sinkRecord.partition(), sinkRecord.bucket());
+        return sinkRecord;
     }
 
     @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception {
-        SinkRecord record = writeRecord(element);
+    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+            throws Exception {
+        super.compact(partition, bucket, fullCompaction);
+        touchBucket(partition, bucket);
+    }
 
-        BinaryRowData partition = record.partition();
-        int bucket = record.bucket();
+    private void touchBucket(BinaryRowData partition, int bucket) {
         // partition is a reused BinaryRowData
         // we first check if the tuple exists to minimize copying
         if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
@@ -150,27 +146,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
     }
 
     @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
-
-        List<Tuple3<Long, BinaryRowData, Integer>> writtenBucketList = new ArrayList<>();
-        for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry :
-                writtenBuckets.entrySet()) {
-            for (Tuple2<BinaryRowData, Integer> bucket : entry.getValue()) {
-                writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, bucket.f1));
-            }
-        }
-        writtenBucketState.update(writtenBucketList);
-
-        List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
-        for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
-            firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
-        }
-        firstWriteMsState.update(firstWriteMsList);
-    }
-
-    @Override
-    protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
+    public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
             throws IOException {
         if (snapshotIdentifierToCheck != null) {
             Optional<Snapshot> snapshot = findSnapshot(snapshotIdentifierToCheck);
@@ -260,4 +236,24 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
                         commitUser,
                         identifierToCheck));
     }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        List<Tuple3<Long, BinaryRowData, Integer>> writtenBucketList = new ArrayList<>();
+        for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry :
+                writtenBuckets.entrySet()) {
+            for (Tuple2<BinaryRowData, Integer> bucket : entry.getValue()) {
+                writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, bucket.f1));
+            }
+        }
+        writtenBucketState.update(writtenBucketList);
+
+        List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
+        for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
+            firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
+        }
+        firstWriteMsState.update(firstWriteMsList);
+    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index b556011b..5e95c875 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -82,17 +82,28 @@ public class StoreSink implements Serializable {
             return new StoreCompactOperator(table, initialCommitUser, compactPartitionSpec);
         }
 
+        boolean isOverwrite = overwritePartition != null;
+        StoreSinkWrite.Provider writeProvider;
         if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
-            return new FullChangelogStoreWriteOperator(
-                    table,
-                    initialCommitUser,
-                    overwritePartition,
-                    logSinkFunction,
-                    table.options().changelogProducerFullCompactionTriggerInterval().toMillis());
+            long fullCompactionThresholdMs =
+                    table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
+            writeProvider =
+                    (table, context, ioManager) ->
+                            new FullChangelogStoreSinkWrite(
+                                    table,
+                                    context,
+                                    initialCommitUser,
+                                    ioManager,
+                                    isOverwrite,
+                                    fullCompactionThresholdMs);
+        } else {
+            writeProvider =
+                    (table, context, ioManager) ->
+                            new StoreSinkWriteImpl(
+                                    table, context, initialCommitUser, ioManager, isOverwrite);
         }
 
-        return new StoreWriteOperator(
-                table, initialCommitUser, overwritePartition, logSinkFunction);
+        return new StoreWriteOperator(table, logSinkFunction, writeProvider);
     }
 
     private StoreCommitter createCommitter(String user, boolean createEmptyCommit) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
new file mode 100644
index 00000000..fe3465d9
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/** Helper class of {@link StoreWriteOperator} for different types of table store sinks. */
+interface StoreSinkWrite {
+
+    SinkRecord write(RowData rowData) throws Exception;
+
+    SinkRecord toLogRecord(SinkRecord record);
+
+    void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
+
+    List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException;
+
+    void snapshotState(StateSnapshotContext context) throws Exception;
+
+    void close() throws Exception;
+
+    @FunctionalInterface
+    interface Provider extends Serializable {
+
+        StoreSinkWrite provide(
+                FileStoreTable table, StateInitializationContext context, IOManager ioManager)
+                throws Exception;
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
new file mode 100644
index 00000000..374616ec
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.TableWrite;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */
+public class StoreSinkWriteImpl implements StoreSinkWrite {
+
+    protected final FileStoreTable table;
+    protected final String commitUser;
+    protected final TableWrite write;
+
+    public StoreSinkWriteImpl(
+            FileStoreTable table,
+            StateInitializationContext context,
+            String initialCommitUser,
+            IOManager ioManager,
+            boolean isOverwrite)
+            throws Exception {
+        this.table = table;
+
+        // Each job can only have one user name and this name must be consistent across restarts.
+        // We cannot use job id as commit user name here because user may change job id by creating
+        // a savepoint, stop the job and then resume from savepoint.
+        commitUser =
+                StateUtils.getSingleValueFromState(
+                        context, "commit_user_state", String.class, initialCommitUser);
+
+        // State will be null if the upstream of this subtask has finished, but some other subtasks
+        // are still running.
+        // See comments of StateUtils.getSingleValueFromState for more detail.
+        //
+        // If the state is null, no new records will come. We only need to deal with checkpoints and
+        // close events.
+        if (commitUser == null) {
+            write = null;
+        } else {
+            write = table.newWrite(commitUser).withIOManager(ioManager).withOverwrite(isOverwrite);
+        }
+    }
+
+    @Override
+    public SinkRecord write(RowData rowData) throws Exception {
+        return write.write(rowData);
+    }
+
+    @Override
+    public SinkRecord toLogRecord(SinkRecord record) {
+        return write.toLogRecord(record);
+    }
+
+    @Override
+    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+            throws Exception {
+        write.compact(partition, bucket, fullCompaction);
+    }
+
+    @Override
+    public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
+            throws IOException {
+        List<Committable> committables = new ArrayList<>();
+        if (write != null) {
+            try {
+                for (FileCommittable committable :
+                        write.prepareCommit(doCompaction, checkpointId)) {
+                    committables.add(
+                            new Committable(checkpointId, Committable.Kind.FILE, committable));
+                }
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+        return committables;
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (write != null) {
+            write.close();
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index f3bfcb19..05696adc 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -35,57 +35,41 @@ import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.log.LogWriteCallback;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
 import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.TableWrite;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /** A {@link PrepareCommitOperator} to write records. */
 public class StoreWriteOperator extends PrepareCommitOperator {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     protected final FileStoreTable table;
 
-    /**
-     * This commitUser is valid only for new jobs. After the job starts, this commitUser will be
-     * recorded into the states of write and commit operators. When the job restarts, commitUser
-     * will be recovered from states and this value is ignored.
-     */
-    private final String initialCommitUser;
+    @Nullable private final LogSinkFunction logSinkFunction;
 
-    @Nullable private final Map<String, String> overwritePartition;
+    private final StoreSinkWrite.Provider storeSinkWriteProvider;
 
-    @Nullable private final LogSinkFunction logSinkFunction;
+    private transient StoreSinkWrite write;
 
     private transient SimpleContext sinkContext;
 
     /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
     private long currentWatermark = Long.MIN_VALUE;
 
-    @Nullable protected transient TableWrite write;
-
-    /** This is the real commit user read from state. */
-    @Nullable protected transient String commitUser;
-
     @Nullable private transient LogWriteCallback logCallback;
 
     public StoreWriteOperator(
             FileStoreTable table,
-            String initialCommitUser,
-            @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkFunction logSinkFunction) {
+            @Nullable LogSinkFunction logSinkFunction,
+            StoreSinkWrite.Provider storeSinkWriteProvider) {
         this.table = table;
-        this.initialCommitUser = initialCommitUser;
-        this.overwritePartition = overwritePartition;
         this.logSinkFunction = logSinkFunction;
+        this.storeSinkWriteProvider = storeSinkWriteProvider;
     }
 
     @Override
@@ -102,31 +86,18 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
-
+        write =
+                storeSinkWriteProvider.provide(
+                        table, context, getContainingTask().getEnvironment().getIOManager());
         if (logSinkFunction != null) {
             StreamingFunctionUtils.restoreFunctionState(context, logSinkFunction);
         }
-
-        // each job can only have one user name and this name must be consistent across restarts
-        // we cannot use job id as commit user name here because user may change job id by creating
-        // a savepoint, stop the job and then resume from savepoint
-        commitUser =
-                StateUtils.getSingleValueFromState(
-                        context, "commit_user_state", String.class, initialCommitUser);
-        // see comments of StateUtils.getSingleValueFromState for why commitUser may be null
-        if (commitUser == null) {
-            write = null;
-        } else {
-            write =
-                    table.newWrite(commitUser)
-                            .withIOManager(getContainingTask().getEnvironment().getIOManager())
-                            .withOverwrite(overwritePartition != null);
-        }
     }
 
     @Override
     public void open() throws Exception {
         super.open();
+
         this.sinkContext = new SimpleContext(getProcessingTimeService());
         if (logSinkFunction != null) {
             FunctionUtils.openFunction(logSinkFunction, new Configuration());
@@ -138,6 +109,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void processWatermark(Watermark mark) throws Exception {
         super.processWatermark(mark);
+
         this.currentWatermark = mark.getTimestamp();
         if (logSinkFunction != null) {
             logSinkFunction.writeWatermark(
@@ -147,10 +119,6 @@ public class StoreWriteOperator extends PrepareCommitOperator {
 
     @Override
     public void processElement(StreamRecord<RowData> element) throws Exception {
-        writeRecord(element);
-    }
-
-    protected SinkRecord writeRecord(StreamRecord<RowData> element) throws Exception {
         sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
 
         SinkRecord record;
@@ -165,13 +133,14 @@ public class StoreWriteOperator extends PrepareCommitOperator {
             SinkRecord logRecord = write.toLogRecord(record);
             logSinkFunction.invoke(logRecord, sinkContext);
         }
-
-        return record;
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
+
+        write.snapshotState(context);
+
         if (logSinkFunction != null) {
             StreamingFunctionUtils.snapshotFunctionState(
                     context, getOperatorStateBackend(), logSinkFunction);
@@ -181,6 +150,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void finish() throws Exception {
         super.finish();
+
         if (logSinkFunction != null) {
             logSinkFunction.finish();
         }
@@ -189,9 +159,8 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void close() throws Exception {
         super.close();
-        if (write != null) {
-            write.close();
-        }
+
+        write.close();
 
         if (logSinkFunction != null) {
             FunctionUtils.closeFunction(logSinkFunction);
@@ -219,18 +188,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
             throws IOException {
-        List<Committable> committables = new ArrayList<>();
-        if (write != null) {
-            try {
-                for (FileCommittable committable :
-                        write.prepareCommit(doCompaction, checkpointId)) {
-                    committables.add(
-                            new Committable(checkpointId, Committable.Kind.FILE, committable));
-                }
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
+        List<Committable> committables = write.prepareCommit(doCompaction, checkpointId);
 
         if (logCallback != null) {
             try {