You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/17 05:39:32 UTC
[flink-table-store] branch master updated: [FLINK-25644] Introduce interfaces between file-table-store and flink connector sink
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e1ce970 [FLINK-25644] Introduce interfaces between file-table-store and flink connector sink
e1ce970 is described below
commit e1ce970c6c1663efd30955004c38f2cb76342324
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Jan 17 13:39:29 2022 +0800
[FLINK-25644] Introduce interfaces between file-table-store and flink connector sink
This closes #9
---
.../apache/flink/table/store/file/FileStore.java | 38 +++++++
.../store/file/manifest/ManifestCommittable.java | 101 +++++++++++++++++++
.../manifest/ManifestCommittableSerializer.java | 111 +++++++++++++++++++++
.../store/file/operation/FileStoreCommit.java | 48 +++++++++
.../store/file/operation/FileStoreExpire.java | 26 +++++
.../table/store/file/operation/FileStoreScan.java | 57 +++++++++++
.../table/store/file/operation/FileStoreWrite.java | 35 +++++++
.../flink/table/store/file/operation/Lock.java | 28 ++++++
.../ManifestCommittableSerializerTest.java | 81 +++++++++++++++
9 files changed, 525 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
new file mode 100644
index 0000000..6ff03ea
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+
+import java.io.Serializable;
+
+/** File store interface. */
+public interface FileStore extends Serializable {
+
+ FileStoreWrite newWrite();
+
+ FileStoreCommit newCommit();
+
+ FileStoreExpire newExpire();
+
+ FileStoreScan newScan();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
new file mode 100644
index 0000000..a02fd75
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Manifest commit message. */
+public class ManifestCommittable {
+
+ private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles;
+
+ private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore;
+
+ private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter;
+
+ public ManifestCommittable() {
+ this.newFiles = new HashMap<>();
+ this.compactBefore = new HashMap<>();
+ this.compactAfter = new HashMap<>();
+ }
+
+ public ManifestCommittable(
+ Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles,
+ Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore,
+ Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter) {
+ this.newFiles = newFiles;
+ this.compactBefore = compactBefore;
+ this.compactAfter = compactAfter;
+ }
+
+ public void add(BinaryRowData partition, int bucket, Increment increment) {
+ addFiles(newFiles, partition, bucket, increment.newFiles());
+ addFiles(compactBefore, partition, bucket, increment.compactBefore());
+ addFiles(compactAfter, partition, bucket, increment.compactAfter());
+ }
+
+ private static void addFiles(
+ Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map,
+ BinaryRowData partition,
+ int bucket,
+ List<SstFileMeta> files) {
+ map.computeIfAbsent(partition, k -> new HashMap<>())
+ .computeIfAbsent(bucket, k -> new ArrayList<>())
+ .addAll(files);
+ }
+
+ public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles() {
+ return newFiles;
+ }
+
+ public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore() {
+ return compactBefore;
+ }
+
+ public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter() {
+ return compactAfter;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ManifestCommittable that = (ManifestCommittable) o;
+ return Objects.equals(newFiles, that.newFiles)
+ && Objects.equals(compactBefore, that.compactBefore)
+ && Objects.equals(compactAfter, that.compactAfter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(newFiles, compactBefore, compactAfter);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
new file mode 100644
index 0000000..c999fc0
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** {@link SimpleVersionedSerializer} for {@link ManifestCommittable}. */
+public class ManifestCommittableSerializer
+ implements SimpleVersionedSerializer<ManifestCommittable> {
+
+ private final BinaryRowDataSerializer partSerializer;
+ private final SstFileMetaSerializer sstSerializer;
+
+ public ManifestCommittableSerializer(
+ RowType partitionType, SstFileMetaSerializer sstSerializer) {
+ this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
+ this.sstSerializer = sstSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(ManifestCommittable obj) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+ serializeFiles(view, obj.newFiles());
+ serializeFiles(view, obj.compactBefore());
+ serializeFiles(view, obj.compactAfter());
+ return out.toByteArray();
+ }
+
+ private void serializeFiles(
+ DataOutputViewStreamWrapper view,
+ Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> files)
+ throws IOException {
+ view.writeInt(files.size());
+ for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entry : files.entrySet()) {
+ partSerializer.serialize(entry.getKey(), view);
+ view.writeInt(entry.getValue().size());
+ for (Map.Entry<Integer, List<SstFileMeta>> bucketEntry : entry.getValue().entrySet()) {
+ view.writeInt(bucketEntry.getKey());
+ view.writeInt(bucketEntry.getValue().size());
+ for (SstFileMeta file : bucketEntry.getValue()) {
+ sstSerializer.serialize(file, view);
+ }
+ }
+ }
+ }
+
+ private Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> deserializeFiles(
+ DataInputDeserializer view) throws IOException {
+ int partNumber = view.readInt();
+ Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> files = new HashMap<>();
+ for (int i = 0; i < partNumber; i++) {
+ BinaryRowData part = partSerializer.deserialize(view);
+ int bucketNumber = view.readInt();
+ Map<Integer, List<SstFileMeta>> bucketMap = new HashMap<>();
+ files.put(part, bucketMap);
+ for (int j = 0; j < bucketNumber; j++) {
+ int bucket = view.readInt();
+ int fileNumber = view.readInt();
+ List<SstFileMeta> fileMetas = new ArrayList<>();
+ bucketMap.put(bucket, fileMetas);
+ for (int k = 0; k < fileNumber; k++) {
+ fileMetas.add(sstSerializer.deserialize(view));
+ }
+ }
+ }
+ return files;
+ }
+
+ @Override
+ public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException {
+ DataInputDeserializer view = new DataInputDeserializer(serialized);
+ return new ManifestCommittable(
+ deserializeFiles(view), deserializeFiles(view), deserializeFiles(view));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
new file mode 100644
index 0000000..912da92
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.util.List;
+import java.util.Map;
+
+/** Commit operation which provides commit and overwrite. */
+public interface FileStoreCommit {
+
+ /** With global lock. */
+ FileStoreCommit withLock(Lock lock);
+
+ /** Find out which manifest committable need to be retried when recovering from the failure. */
+ List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList);
+
+ /** Commit from manifest committable. */
+ void commit(ManifestCommittable committable, Map<String, String> properties);
+
+ /**
+ * Overwrite from manifest committable and partition.
+ *
+ * @param partition A single partition maps each partition key to a partition value. Depending
+ * on the * user-defined statement, the partition might not include all partition keys.
+ */
+ void overwrite(
+ Map<String, String> partition,
+ ManifestCommittable committable,
+ Map<String, String> properties);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
new file mode 100644
index 0000000..feefebd
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+/** Expire operation which provides snapshots expire. */
+public interface FileStoreExpire {
+
+ /** Expire snapshots. */
+ void expire();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
new file mode 100644
index 0000000..52edea3
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Scan operation which produces a plan. */
+public interface FileStoreScan {
+
+ FileStoreScan withPartitionFilter(Predicate predicate);
+
+ FileStoreScan withKeyFilter(Predicate predicate);
+
+ FileStoreScan withValueFilter(Predicate predicate);
+
+ FileStoreScan withBucket(int bucket);
+
+ FileStoreScan withSnapshot(long snapshotId);
+
+ FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
+
+ /** Produce a {@link Plan}. */
+ Plan plan();
+
+ /** Result plan of this scan. */
+ interface Plan {
+
+ /** Snapshot id of this plan, return null if manifest list is specified. */
+ @Nullable
+ Long snapshotId();
+
+ /** Result {@link ManifestEntry} files. */
+ List<ManifestEntry> files();
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
new file mode 100644
index 0000000..0e6b49f
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.util.concurrent.ExecutorService;
+
+/** Write operation which provides {@link RecordWriter} creation. */
+public interface FileStoreWrite {
+
+ /** Create a {@link RecordWriter} from partition and bucket. */
+ RecordWriter createWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+
+ /** Create an empty {@link RecordWriter} from partition and bucket. */
+ RecordWriter createEmptyWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
new file mode 100644
index 0000000..9f28f83
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import java.util.concurrent.Callable;
+
+/** An interface that allows file store to use global lock to some transaction-related things. */
+public interface Lock {
+
+ /** Run with lock. */
+ <T> T runWithLock(Callable<T> callable) throws Exception;
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
new file mode 100644
index 0000000..2fe3241
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ManifestCommittableSerializer}. */
+public class ManifestCommittableSerializerTest {
+
+ private final AtomicInteger id = new AtomicInteger();
+
+ @Test
+ public void testCommittableSerDe() throws IOException {
+ SstFileMetaSerializer sstSerializer =
+ new SstFileMetaSerializer(RowType.of(new IntType()), RowType.of(new IntType()));
+ ManifestCommittableSerializer serializer =
+ new ManifestCommittableSerializer(RowType.of(new IntType()), sstSerializer);
+ ManifestCommittable committable = new ManifestCommittable();
+ addAndAssert(committable, row(0), 0);
+ addAndAssert(committable, row(0), 1);
+ addAndAssert(committable, row(1), 0);
+ addAndAssert(committable, row(1), 1);
+ byte[] serialized = serializer.serialize(committable);
+ assertThat(serializer.deserialize(1, serialized)).isEqualTo(committable);
+ }
+
+ private void addAndAssert(
+ ManifestCommittable committable, BinaryRowData partition, int bucket) {
+ Increment increment = newIncrement();
+ committable.add(partition, bucket, increment);
+ assertThat(committable.newFiles().get(partition).get(bucket))
+ .isEqualTo(increment.newFiles());
+ assertThat(committable.compactBefore().get(partition).get(bucket))
+ .isEqualTo(increment.compactBefore());
+ assertThat(committable.compactAfter().get(partition).get(bucket))
+ .isEqualTo(increment.compactAfter());
+ }
+
+ private Increment newIncrement() {
+ return new Increment(
+ Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
+ Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
+ Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)));
+ }
+
+ public static SstFileMeta newFile(int name, int level) {
+ FieldStats[] stats = new FieldStats[] {new FieldStats(0, 1, 0)};
+ return new SstFileMeta(String.valueOf(name), 0, 1, row(0), row(0), stats, 0, 1, level);
+ }
+}