You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 10:44:56 UTC
[flink-table-store] 02/04: [FLINK-25630] Introduce RecordWriter
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit c94dfa8191bc39bf57e05b1e37840a3e0e4cd43d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:38:15 2022 +0800
[FLINK-25630] Introduce RecordWriter
---
.../table/store/file/mergetree/Increment.java | 63 ++++++++++++++++++++++
.../flink/table/store/file/utils/RecordWriter.java | 57 ++++++++++++++++++++
2 files changed, 120 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
new file mode 100644
index 0000000..c8afffe
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Incremental files for merge tree. It consists of two parts:
+ *
+ * <ul>
+ * <li>New files: The new files generated in this snapshot cycle. They must be committed.
+ * <li>Compact files: The {@link #compactBefore} files are compacted to {@link #compactAfter}
+ * files in this snapshot cycle. The compaction is an optimization of files.
+ * </ul>
+ */
+public class Increment {
+
+ private final List<SstFileMeta> newFiles;
+
+ private final List<SstFileMeta> compactBefore;
+
+ private final List<SstFileMeta> compactAfter;
+
+ public Increment(
+ List<SstFileMeta> newFiles,
+ List<SstFileMeta> beCompacted,
+ List<SstFileMeta> compacted) {
+ this.newFiles = Collections.unmodifiableList(newFiles);
+ this.compactBefore = Collections.unmodifiableList(beCompacted);
+ this.compactAfter = Collections.unmodifiableList(compacted);
+ }
+
+ public List<SstFileMeta> newFiles() {
+ return newFiles;
+ }
+
+ public List<SstFileMeta> compactBefore() {
+ return compactBefore;
+ }
+
+ public List<SstFileMeta> compactAfter() {
+ return compactAfter;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
new file mode 100644
index 0000000..4047301
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.List;
+
+/**
+ * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
+ * write yet un-staged data. The incremental files ready to commit is returned to the system by the
+ * {@link #prepareCommit()}.
+ */
+public interface RecordWriter {
+
+ /** Add a key-value element to the writer. */
+ void write(ValueKind valueKind, RowData key, RowData value) throws Exception;
+
+ /**
+ * Prepare for a commit.
+ *
+ * @return Incremental files in this snapshot cycle
+ */
+ Increment prepareCommit() throws Exception;
+
+ /**
+ * Sync the writer. The structure related to file reading and writing is thread unsafe, there
+ * are asynchronous threads inside the writer, which should be synced before reading data.
+ */
+ void sync() throws Exception;
+
+ /**
+ * Close this writer, the call will delete newly generated but not committed files.
+ *
+ * @return Deleted files.
+ */
+ List<SstFileMeta> close() throws Exception;
+}