You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/13 09:13:00 UTC
[flink-table-store] 01/02: [FLINK-30210] Refactor StoreWriteOperator so periodic full compaction writer can be reused in compactor sink
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 2073bd66a4e41ac47a98b0c0370b0ccdeb516c62
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Nov 23 15:05:17 2022 +0800
[FLINK-30210] Refactor StoreWriteOperator so periodic full compaction writer can be reused in compactor sink
---
...rator.java => FullChangelogStoreSinkWrite.java} | 108 +++++++++----------
.../table/store/connector/sink/StoreSink.java | 27 +++--
.../table/store/connector/sink/StoreSinkWrite.java | 55 ++++++++++
.../store/connector/sink/StoreSinkWriteImpl.java | 116 +++++++++++++++++++++
.../store/connector/sink/StoreWriteOperator.java | 80 ++++----------
5 files changed, 261 insertions(+), 125 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
similarity index 84%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 59f392cf..58827b4f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
@@ -36,11 +36,8 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.flink.table.store.table.sink.SinkRecord;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -52,40 +49,36 @@ import java.util.Set;
import java.util.TreeMap;
/**
- * A {@link StoreWriteOperator} for {@link
- * org.apache.flink.table.store.CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog producer.
+ * {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
+ * producer. This writer will perform full compaction once in a while.
*/
-public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
-
- private static final long serialVersionUID = 1L;
+public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
private final long fullCompactionThresholdMs;
- private transient Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
- private transient NavigableMap<Long, Set<Tuple2<BinaryRowData, Integer>>> writtenBuckets;
- private transient ListState<Tuple3<Long, BinaryRowData, Integer>> writtenBucketState;
+ private final Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
+ private final NavigableMap<Long, Set<Tuple2<BinaryRowData, Integer>>> writtenBuckets;
+ private final ListState<Tuple3<Long, BinaryRowData, Integer>> writtenBucketState;
- private transient Long currentFirstWriteMs;
- private transient NavigableMap<Long, Long> firstWriteMs;
- private transient ListState<Tuple2<Long, Long>> firstWriteMsState;
+ private Long currentFirstWriteMs;
+ private final NavigableMap<Long, Long> firstWriteMs;
+ private final ListState<Tuple2<Long, Long>> firstWriteMsState;
- private transient Long snapshotIdentifierToCheck;
+ private Long snapshotIdentifierToCheck;
- public FullChangelogStoreWriteOperator(
+ public FullChangelogStoreSinkWrite(
FileStoreTable table,
+ StateInitializationContext context,
String initialCommitUser,
- @Nullable Map<String, String> overwritePartition,
- @Nullable LogSinkFunction logSinkFunction,
- long fullCompactionThresholdMs) {
- super(table, initialCommitUser, overwritePartition, logSinkFunction);
- this.fullCompactionThresholdMs = fullCompactionThresholdMs;
- }
+ IOManager ioManager,
+ boolean isOverwrite,
+ long fullCompactionThresholdMs)
+ throws Exception {
+ super(table, context, initialCommitUser, ioManager, isOverwrite);
- @SuppressWarnings("unchecked")
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- super.initializeState(context);
+ this.fullCompactionThresholdMs = fullCompactionThresholdMs;
+ currentWrittenBuckets = new HashSet<>();
TupleSerializer<Tuple3<Long, BinaryRowData, Integer>> writtenBucketStateSerializer =
new TupleSerializer<>(
(Class<Tuple3<Long, BinaryRowData, Integer>>) (Class<?>) Tuple3.class,
@@ -110,6 +103,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
.computeIfAbsent(t.f0, k -> new HashSet<>())
.add(Tuple2.of(t.f1, t.f2)));
+ currentFirstWriteMs = null;
TupleSerializer<Tuple2<Long, Long>> firstWriteMsStateSerializer =
new TupleSerializer<>(
(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
@@ -126,18 +120,20 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
}
@Override
- public void open() throws Exception {
- super.open();
- currentWrittenBuckets = new HashSet<>();
- currentFirstWriteMs = null;
+ public SinkRecord write(RowData rowData) throws Exception {
+ SinkRecord sinkRecord = super.write(rowData);
+ touchBucket(sinkRecord.partition(), sinkRecord.bucket());
+ return sinkRecord;
}
@Override
- public void processElement(StreamRecord<RowData> element) throws Exception {
- SinkRecord record = writeRecord(element);
+ public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+ throws Exception {
+ super.compact(partition, bucket, fullCompaction);
+ touchBucket(partition, bucket);
+ }
- BinaryRowData partition = record.partition();
- int bucket = record.bucket();
+ private void touchBucket(BinaryRowData partition, int bucket) {
// partition is a reused BinaryRowData
// we first check if the tuple exists to minimize copying
if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
@@ -150,27 +146,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
}
@Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
-
- List<Tuple3<Long, BinaryRowData, Integer>> writtenBucketList = new ArrayList<>();
- for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry :
- writtenBuckets.entrySet()) {
- for (Tuple2<BinaryRowData, Integer> bucket : entry.getValue()) {
- writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, bucket.f1));
- }
- }
- writtenBucketState.update(writtenBucketList);
-
- List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
- for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
- firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
- }
- firstWriteMsState.update(firstWriteMsList);
- }
-
- @Override
- protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
+ public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
throws IOException {
if (snapshotIdentifierToCheck != null) {
Optional<Snapshot> snapshot = findSnapshot(snapshotIdentifierToCheck);
@@ -260,4 +236,24 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
commitUser,
identifierToCheck));
}
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+
+ List<Tuple3<Long, BinaryRowData, Integer>> writtenBucketList = new ArrayList<>();
+ for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry :
+ writtenBuckets.entrySet()) {
+ for (Tuple2<BinaryRowData, Integer> bucket : entry.getValue()) {
+ writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, bucket.f1));
+ }
+ }
+ writtenBucketState.update(writtenBucketList);
+
+ List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
+ for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
+ firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ }
+ firstWriteMsState.update(firstWriteMsList);
+ }
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index b556011b..5e95c875 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -82,17 +82,28 @@ public class StoreSink implements Serializable {
return new StoreCompactOperator(table, initialCommitUser, compactPartitionSpec);
}
+ boolean isOverwrite = overwritePartition != null;
+ StoreSinkWrite.Provider writeProvider;
if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
- return new FullChangelogStoreWriteOperator(
- table,
- initialCommitUser,
- overwritePartition,
- logSinkFunction,
- table.options().changelogProducerFullCompactionTriggerInterval().toMillis());
+ long fullCompactionThresholdMs =
+ table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
+ writeProvider =
+ (table, context, ioManager) ->
+ new FullChangelogStoreSinkWrite(
+ table,
+ context,
+ initialCommitUser,
+ ioManager,
+ isOverwrite,
+ fullCompactionThresholdMs);
+ } else {
+ writeProvider =
+ (table, context, ioManager) ->
+ new StoreSinkWriteImpl(
+ table, context, initialCommitUser, ioManager, isOverwrite);
}
- return new StoreWriteOperator(
- table, initialCommitUser, overwritePartition, logSinkFunction);
+ return new StoreWriteOperator(table, logSinkFunction, writeProvider);
}
private StoreCommitter createCommitter(String user, boolean createEmptyCommit) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
new file mode 100644
index 00000000..fe3465d9
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/** Helper class of {@link StoreWriteOperator} for different types of table store sinks. */
+interface StoreSinkWrite {
+
+ SinkRecord write(RowData rowData) throws Exception;
+
+ SinkRecord toLogRecord(SinkRecord record);
+
+ void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
+
+ List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException;
+
+ void snapshotState(StateSnapshotContext context) throws Exception;
+
+ void close() throws Exception;
+
+ @FunctionalInterface
+ interface Provider extends Serializable {
+
+ StoreSinkWrite provide(
+ FileStoreTable table, StateInitializationContext context, IOManager ioManager)
+ throws Exception;
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
new file mode 100644
index 00000000..374616ec
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.TableWrite;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */
+public class StoreSinkWriteImpl implements StoreSinkWrite {
+
+ protected final FileStoreTable table;
+ protected final String commitUser;
+ protected final TableWrite write;
+
+ public StoreSinkWriteImpl(
+ FileStoreTable table,
+ StateInitializationContext context,
+ String initialCommitUser,
+ IOManager ioManager,
+ boolean isOverwrite)
+ throws Exception {
+ this.table = table;
+
+ // Each job can only have one user name and this name must be consistent across restarts.
+ // We cannot use job id as commit user name here because user may change job id by creating
+ // a savepoint, stop the job and then resume from savepoint.
+ commitUser =
+ StateUtils.getSingleValueFromState(
+ context, "commit_user_state", String.class, initialCommitUser);
+
+ // State will be null if the upstream of this subtask has finished, but some other subtasks
+ // are still running.
+ // See comments of StateUtils.getSingleValueFromState for more detail.
+ //
+ // If the state is null, no new records will come. We only need to deal with checkpoints and
+ // close events.
+ if (commitUser == null) {
+ write = null;
+ } else {
+ write = table.newWrite(commitUser).withIOManager(ioManager).withOverwrite(isOverwrite);
+ }
+ }
+
+ @Override
+ public SinkRecord write(RowData rowData) throws Exception {
+ return write.write(rowData);
+ }
+
+ @Override
+ public SinkRecord toLogRecord(SinkRecord record) {
+ return write.toLogRecord(record);
+ }
+
+ @Override
+ public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+ throws Exception {
+ write.compact(partition, bucket, fullCompaction);
+ }
+
+ @Override
+ public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
+ throws IOException {
+ List<Committable> committables = new ArrayList<>();
+ if (write != null) {
+ try {
+ for (FileCommittable committable :
+ write.prepareCommit(doCompaction, checkpointId)) {
+ committables.add(
+ new Committable(checkpointId, Committable.Kind.FILE, committable));
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ return committables;
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (write != null) {
+ write.close();
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index f3bfcb19..05696adc 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -35,57 +35,41 @@ import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.TableWrite;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/** A {@link PrepareCommitOperator} to write records. */
public class StoreWriteOperator extends PrepareCommitOperator {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
protected final FileStoreTable table;
- /**
- * This commitUser is valid only for new jobs. After the job starts, this commitUser will be
- * recorded into the states of write and commit operators. When the job restarts, commitUser
- * will be recovered from states and this value is ignored.
- */
- private final String initialCommitUser;
+ @Nullable private final LogSinkFunction logSinkFunction;
- @Nullable private final Map<String, String> overwritePartition;
+ private final StoreSinkWrite.Provider storeSinkWriteProvider;
- @Nullable private final LogSinkFunction logSinkFunction;
+ private transient StoreSinkWrite write;
private transient SimpleContext sinkContext;
/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
private long currentWatermark = Long.MIN_VALUE;
- @Nullable protected transient TableWrite write;
-
- /** This is the real commit user read from state. */
- @Nullable protected transient String commitUser;
-
@Nullable private transient LogWriteCallback logCallback;
public StoreWriteOperator(
FileStoreTable table,
- String initialCommitUser,
- @Nullable Map<String, String> overwritePartition,
- @Nullable LogSinkFunction logSinkFunction) {
+ @Nullable LogSinkFunction logSinkFunction,
+ StoreSinkWrite.Provider storeSinkWriteProvider) {
this.table = table;
- this.initialCommitUser = initialCommitUser;
- this.overwritePartition = overwritePartition;
this.logSinkFunction = logSinkFunction;
+ this.storeSinkWriteProvider = storeSinkWriteProvider;
}
@Override
@@ -102,31 +86,18 @@ public class StoreWriteOperator extends PrepareCommitOperator {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
-
+ write =
+ storeSinkWriteProvider.provide(
+ table, context, getContainingTask().getEnvironment().getIOManager());
if (logSinkFunction != null) {
StreamingFunctionUtils.restoreFunctionState(context, logSinkFunction);
}
-
- // each job can only have one user name and this name must be consistent across restarts
- // we cannot use job id as commit user name here because user may change job id by creating
- // a savepoint, stop the job and then resume from savepoint
- commitUser =
- StateUtils.getSingleValueFromState(
- context, "commit_user_state", String.class, initialCommitUser);
- // see comments of StateUtils.getSingleValueFromState for why commitUser may be null
- if (commitUser == null) {
- write = null;
- } else {
- write =
- table.newWrite(commitUser)
- .withIOManager(getContainingTask().getEnvironment().getIOManager())
- .withOverwrite(overwritePartition != null);
- }
}
@Override
public void open() throws Exception {
super.open();
+
this.sinkContext = new SimpleContext(getProcessingTimeService());
if (logSinkFunction != null) {
FunctionUtils.openFunction(logSinkFunction, new Configuration());
@@ -138,6 +109,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
+
this.currentWatermark = mark.getTimestamp();
if (logSinkFunction != null) {
logSinkFunction.writeWatermark(
@@ -147,10 +119,6 @@ public class StoreWriteOperator extends PrepareCommitOperator {
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
- writeRecord(element);
- }
-
- protected SinkRecord writeRecord(StreamRecord<RowData> element) throws Exception {
sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
SinkRecord record;
@@ -165,13 +133,14 @@ public class StoreWriteOperator extends PrepareCommitOperator {
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
}
-
- return record;
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
+
+ write.snapshotState(context);
+
if (logSinkFunction != null) {
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), logSinkFunction);
@@ -181,6 +150,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
@Override
public void finish() throws Exception {
super.finish();
+
if (logSinkFunction != null) {
logSinkFunction.finish();
}
@@ -189,9 +159,8 @@ public class StoreWriteOperator extends PrepareCommitOperator {
@Override
public void close() throws Exception {
super.close();
- if (write != null) {
- write.close();
- }
+
+ write.close();
if (logSinkFunction != null) {
FunctionUtils.closeFunction(logSinkFunction);
@@ -219,18 +188,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
@Override
protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
throws IOException {
- List<Committable> committables = new ArrayList<>();
- if (write != null) {
- try {
- for (FileCommittable committable :
- write.prepareCommit(doCompaction, checkpointId)) {
- committables.add(
- new Committable(checkpointId, Committable.Kind.FILE, committable));
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
+ List<Committable> committables = write.prepareCommit(doCompaction, checkpointId);
if (logCallback != null) {
try {