You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/17 05:39:32 UTC

[flink-table-store] branch master updated: [FLINK-25644] Introduce interfaces between file-table-store and flink connector sink

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e1ce970  [FLINK-25644] Introduce interfaces between file-table-store and flink connector sink
e1ce970 is described below

commit e1ce970c6c1663efd30955004c38f2cb76342324
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Jan 17 13:39:29 2022 +0800

    [FLINK-25644] Introduce interfaces between file-table-store and flink connector sink
    
    This closes #9
---
 .../apache/flink/table/store/file/FileStore.java   |  38 +++++++
 .../store/file/manifest/ManifestCommittable.java   | 101 +++++++++++++++++++
 .../manifest/ManifestCommittableSerializer.java    | 111 +++++++++++++++++++++
 .../store/file/operation/FileStoreCommit.java      |  48 +++++++++
 .../store/file/operation/FileStoreExpire.java      |  26 +++++
 .../table/store/file/operation/FileStoreScan.java  |  57 +++++++++++
 .../table/store/file/operation/FileStoreWrite.java |  35 +++++++
 .../flink/table/store/file/operation/Lock.java     |  28 ++++++
 .../ManifestCommittableSerializerTest.java         |  81 +++++++++++++++
 9 files changed, 525 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
new file mode 100644
index 0000000..6ff03ea
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+
+import java.io.Serializable;
+
+/** File store interface. */
+public interface FileStore extends Serializable {
+
+    FileStoreWrite newWrite();
+
+    FileStoreCommit newCommit();
+
+    FileStoreExpire newExpire();
+
+    FileStoreScan newScan();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
new file mode 100644
index 0000000..a02fd75
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Manifest commit message. */
+public class ManifestCommittable {
+
+    private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles;
+
+    private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore;
+
+    private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter;
+
+    public ManifestCommittable() {
+        this.newFiles = new HashMap<>();
+        this.compactBefore = new HashMap<>();
+        this.compactAfter = new HashMap<>();
+    }
+
+    public ManifestCommittable(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles,
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore,
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter) {
+        this.newFiles = newFiles;
+        this.compactBefore = compactBefore;
+        this.compactAfter = compactAfter;
+    }
+
+    public void add(BinaryRowData partition, int bucket, Increment increment) {
+        addFiles(newFiles, partition, bucket, increment.newFiles());
+        addFiles(compactBefore, partition, bucket, increment.compactBefore());
+        addFiles(compactAfter, partition, bucket, increment.compactAfter());
+    }
+
+    private static void addFiles(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map,
+            BinaryRowData partition,
+            int bucket,
+            List<SstFileMeta> files) {
+        map.computeIfAbsent(partition, k -> new HashMap<>())
+                .computeIfAbsent(bucket, k -> new ArrayList<>())
+                .addAll(files);
+    }
+
+    public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles() {
+        return newFiles;
+    }
+
+    public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore() {
+        return compactBefore;
+    }
+
+    public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter() {
+        return compactAfter;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ManifestCommittable that = (ManifestCommittable) o;
+        return Objects.equals(newFiles, that.newFiles)
+                && Objects.equals(compactBefore, that.compactBefore)
+                && Objects.equals(compactAfter, that.compactAfter);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(newFiles, compactBefore, compactAfter);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
new file mode 100644
index 0000000..c999fc0
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** {@link SimpleVersionedSerializer} for {@link ManifestCommittable}. */
+public class ManifestCommittableSerializer
+        implements SimpleVersionedSerializer<ManifestCommittable> {
+
+    private final BinaryRowDataSerializer partSerializer;
+    private final SstFileMetaSerializer sstSerializer;
+
+    public ManifestCommittableSerializer(
+            RowType partitionType, SstFileMetaSerializer sstSerializer) {
+        this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
+        this.sstSerializer = sstSerializer;
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(ManifestCommittable obj) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+        serializeFiles(view, obj.newFiles());
+        serializeFiles(view, obj.compactBefore());
+        serializeFiles(view, obj.compactAfter());
+        return out.toByteArray();
+    }
+
+    private void serializeFiles(
+            DataOutputViewStreamWrapper view,
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> files)
+            throws IOException {
+        view.writeInt(files.size());
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entry : files.entrySet()) {
+            partSerializer.serialize(entry.getKey(), view);
+            view.writeInt(entry.getValue().size());
+            for (Map.Entry<Integer, List<SstFileMeta>> bucketEntry : entry.getValue().entrySet()) {
+                view.writeInt(bucketEntry.getKey());
+                view.writeInt(bucketEntry.getValue().size());
+                for (SstFileMeta file : bucketEntry.getValue()) {
+                    sstSerializer.serialize(file, view);
+                }
+            }
+        }
+    }
+
+    private Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> deserializeFiles(
+            DataInputDeserializer view) throws IOException {
+        int partNumber = view.readInt();
+        Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> files = new HashMap<>();
+        for (int i = 0; i < partNumber; i++) {
+            BinaryRowData part = partSerializer.deserialize(view);
+            int bucketNumber = view.readInt();
+            Map<Integer, List<SstFileMeta>> bucketMap = new HashMap<>();
+            files.put(part, bucketMap);
+            for (int j = 0; j < bucketNumber; j++) {
+                int bucket = view.readInt();
+                int fileNumber = view.readInt();
+                List<SstFileMeta> fileMetas = new ArrayList<>();
+                bucketMap.put(bucket, fileMetas);
+                for (int k = 0; k < fileNumber; k++) {
+                    fileMetas.add(sstSerializer.deserialize(view));
+                }
+            }
+        }
+        return files;
+    }
+
+    @Override
+    public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException {
+        DataInputDeserializer view = new DataInputDeserializer(serialized);
+        return new ManifestCommittable(
+                deserializeFiles(view), deserializeFiles(view), deserializeFiles(view));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
new file mode 100644
index 0000000..912da92
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.util.List;
+import java.util.Map;
+
+/** Commit operation which provides commit and overwrite. */
+public interface FileStoreCommit {
+
+    /** With global lock. */
+    FileStoreCommit withLock(Lock lock);
+
+    /** Find out which manifest committable need to be retried when recovering from the failure. */
+    List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList);
+
+    /** Commit from manifest committable. */
+    void commit(ManifestCommittable committable, Map<String, String> properties);
+
+    /**
+     * Overwrite from manifest committable and partition.
+     *
+     * @param partition A single partition maps each partition key to a partition value. Depending
+     *     on the * user-defined statement, the partition might not include all partition keys.
+     */
+    void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
new file mode 100644
index 0000000..feefebd
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+/** Expire operation which provides snapshots expire. */
+public interface FileStoreExpire {
+
+    /** Expire snapshots. */
+    void expire();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
new file mode 100644
index 0000000..52edea3
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Scan operation which produces a plan. */
+public interface FileStoreScan {
+
+    FileStoreScan withPartitionFilter(Predicate predicate);
+
+    FileStoreScan withKeyFilter(Predicate predicate);
+
+    FileStoreScan withValueFilter(Predicate predicate);
+
+    FileStoreScan withBucket(int bucket);
+
+    FileStoreScan withSnapshot(long snapshotId);
+
+    FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
+
+    /** Produce a {@link Plan}. */
+    Plan plan();
+
+    /** Result plan of this scan. */
+    interface Plan {
+
+        /** Snapshot id of this plan, return null if manifest list is specified. */
+        @Nullable
+        Long snapshotId();
+
+        /** Result {@link ManifestEntry} files. */
+        List<ManifestEntry> files();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
new file mode 100644
index 0000000..0e6b49f
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.util.concurrent.ExecutorService;
+
+/** Write operation which provides {@link RecordWriter} creation. */
+public interface FileStoreWrite {
+
+    /** Create a {@link RecordWriter} from partition and bucket. */
+    RecordWriter createWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+
+    /** Create an empty {@link RecordWriter} from partition and bucket. */
+    RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
new file mode 100644
index 0000000..9f28f83
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import java.util.concurrent.Callable;
+
+/** An interface that allows file store to use global lock to some transaction-related things. */
+public interface Lock {
+
+    /** Run with lock. */
+    <T> T runWithLock(Callable<T> callable) throws Exception;
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
new file mode 100644
index 0000000..2fe3241
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ManifestCommittableSerializer}. */
+public class ManifestCommittableSerializerTest {
+
+    private final AtomicInteger id = new AtomicInteger();
+
+    @Test
+    public void testCommittableSerDe() throws IOException {
+        SstFileMetaSerializer sstSerializer =
+                new SstFileMetaSerializer(RowType.of(new IntType()), RowType.of(new IntType()));
+        ManifestCommittableSerializer serializer =
+                new ManifestCommittableSerializer(RowType.of(new IntType()), sstSerializer);
+        ManifestCommittable committable = new ManifestCommittable();
+        addAndAssert(committable, row(0), 0);
+        addAndAssert(committable, row(0), 1);
+        addAndAssert(committable, row(1), 0);
+        addAndAssert(committable, row(1), 1);
+        byte[] serialized = serializer.serialize(committable);
+        assertThat(serializer.deserialize(1, serialized)).isEqualTo(committable);
+    }
+
+    private void addAndAssert(
+            ManifestCommittable committable, BinaryRowData partition, int bucket) {
+        Increment increment = newIncrement();
+        committable.add(partition, bucket, increment);
+        assertThat(committable.newFiles().get(partition).get(bucket))
+                .isEqualTo(increment.newFiles());
+        assertThat(committable.compactBefore().get(partition).get(bucket))
+                .isEqualTo(increment.compactBefore());
+        assertThat(committable.compactAfter().get(partition).get(bucket))
+                .isEqualTo(increment.compactAfter());
+    }
+
+    private Increment newIncrement() {
+        return new Increment(
+                Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
+                Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
+                Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)));
+    }
+
+    public static SstFileMeta newFile(int name, int level) {
+        FieldStats[] stats = new FieldStats[] {new FieldStats(0, 1, 0)};
+        return new SstFileMeta(String.valueOf(name), 0, 1, row(0), row(0), stats, 0, 1, level);
+    }
+}