You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 10:44:56 UTC

[flink-table-store] 02/04: [FLINK-25630] Introduce RecordWriter

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit c94dfa8191bc39bf57e05b1e37840a3e0e4cd43d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:38:15 2022 +0800

    [FLINK-25630] Introduce RecordWriter
---
 .../table/store/file/mergetree/Increment.java      | 63 ++++++++++++++++++++++
 .../flink/table/store/file/utils/RecordWriter.java | 57 ++++++++++++++++++++
 2 files changed, 120 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
new file mode 100644
index 0000000..c8afffe
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Incremental files for merge tree. It consists of two parts:
+ *
+ * <ul>
+ *   <li>New files: The new files generated in this snapshot cycle. They must be committed.
+ *   <li>Compact files: The {@link #compactBefore} files are compacted to {@link #compactAfter}
+ *       files in this snapshot cycle. The compaction is an optimization of files.
+ * </ul>
+ */
+public class Increment {
+
+    private final List<SstFileMeta> newFiles;
+
+    private final List<SstFileMeta> compactBefore;
+
+    private final List<SstFileMeta> compactAfter;
+
+    public Increment(
+            List<SstFileMeta> newFiles,
+            List<SstFileMeta> beCompacted,
+            List<SstFileMeta> compacted) {
+        this.newFiles = Collections.unmodifiableList(newFiles);
+        this.compactBefore = Collections.unmodifiableList(beCompacted);
+        this.compactAfter = Collections.unmodifiableList(compacted);
+    }
+
+    public List<SstFileMeta> newFiles() {
+        return newFiles;
+    }
+
+    public List<SstFileMeta> compactBefore() {
+        return compactBefore;
+    }
+
+    public List<SstFileMeta> compactAfter() {
+        return compactAfter;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
new file mode 100644
index 0000000..4047301
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.List;
+
+/**
+ * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
+ * write yet un-staged data. The incremental files ready to commit is returned to the system by the
+ * {@link #prepareCommit()}.
+ */
+public interface RecordWriter {
+
+    /** Add a key-value element to the writer. */
+    void write(ValueKind valueKind, RowData key, RowData value) throws Exception;
+
+    /**
+     * Prepare for a commit.
+     *
+     * @return Incremental files in this snapshot cycle
+     */
+    Increment prepareCommit() throws Exception;
+
+    /**
+     * Sync the writer. The structure related to file reading and writing is thread unsafe, there
+     * are asynchronous threads inside the writer, which should be synced before reading data.
+     */
+    void sync() throws Exception;
+
+    /**
+     * Close this writer, the call will delete newly generated but not committed files.
+     *
+     * @return Deleted files.
+     */
+    List<SstFileMeta> close() throws Exception;
+}