You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/20 03:23:21 UTC
[flink-table-store] branch master updated: [FLINK-25680] Introduce Table Store Flink Sink
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new beaf4ad [FLINK-25680] Introduce Table Store Flink Sink
beaf4ad is described below
commit beaf4adb2b3aae44df3334c4a562a927afefd160
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jan 20 11:23:18 2022 +0800
[FLINK-25680] Introduce Table Store Flink Sink
This closes #10
---
flink-table-store-connector/pom.xml | 105 +++++++++
.../store/connector/sink/LocalCommittable.java | 71 +++++++
.../connector/sink/LocalCommittableSerializer.java | 89 ++++++++
.../table/store/connector/sink/SinkRecord.java | 71 +++++++
.../store/connector/sink/SinkRecordConverter.java | 56 +++++
.../store/connector/sink/StoreGlobalCommitter.java | 102 +++++++++
.../table/store/connector/sink/StoreSink.java | 140 ++++++++++++
.../store/connector/sink/StoreSinkWriter.java | 165 +++++++++++++++
.../store/connector/utils/ProjectionUtils.java | 63 ++++++
.../sink/LocalCommittableSerializerTest.java | 49 +++++
.../table/store/connector/sink/StoreSinkTest.java | 235 +++++++++++++++++++++
.../table/store/connector/sink/TestFileStore.java | 202 ++++++++++++++++++
.../manifest/ManifestCommittableSerializer.java | 5 +-
.../table/store/file/mergetree/Increment.java | 20 ++
.../ManifestCommittableSerializerTest.java | 20 +-
pom.xml | 1 +
16 files changed, 1381 insertions(+), 13 deletions(-)
diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
new file mode 100644
index 0000000..896a793
--- /dev/null
+++ b/flink-table-store-connector/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-connector</artifactId>
+ <name>Flink Table Store : Connector</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- flink dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
new file mode 100644
index 0000000..c6bbeca
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.Increment;
+
+import java.util.Objects;
+
+/** Local committable for sink. */
+public class LocalCommittable {
+
+ private final BinaryRowData partition;
+
+ private final int bucket;
+
+ private final Increment increment;
+
+ public LocalCommittable(BinaryRowData partition, int bucket, Increment increment) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.increment = increment;
+ }
+
+ public BinaryRowData partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public Increment increment() {
+ return increment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LocalCommittable that = (LocalCommittable) o;
+ return bucket == that.bucket
+ && Objects.equals(partition, that.partition)
+ && Objects.equals(increment, that.increment);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partition, bucket, increment);
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
new file mode 100644
index 0000000..a8c698d
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link SimpleVersionedSerializer} for {@link LocalCommittable}. */
+public class LocalCommittableSerializer implements SimpleVersionedSerializer<LocalCommittable> {
+
+ private final BinaryRowDataSerializer partSerializer;
+ private final SstFileMetaSerializer sstSerializer;
+
+ public LocalCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+ this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
+ this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(LocalCommittable obj) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+ partSerializer.serialize(obj.partition(), view);
+ view.writeInt(obj.bucket());
+ serializeFiles(view, obj.increment().newFiles());
+ serializeFiles(view, obj.increment().compactBefore());
+ serializeFiles(view, obj.increment().compactAfter());
+ return out.toByteArray();
+ }
+
+ private void serializeFiles(DataOutputViewStreamWrapper view, List<SstFileMeta> files)
+ throws IOException {
+ view.writeInt(files.size());
+ for (SstFileMeta file : files) {
+ sstSerializer.serialize(file, view);
+ }
+ }
+
+ private List<SstFileMeta> deserializeFiles(DataInputDeserializer view) throws IOException {
+ int fileNumber = view.readInt();
+ List<SstFileMeta> files = new ArrayList<>(fileNumber);
+ for (int i = 0; i < fileNumber; i++) {
+ files.add(sstSerializer.deserialize(view));
+ }
+ return files;
+ }
+
+ @Override
+ public LocalCommittable deserialize(int version, byte[] serialized) throws IOException {
+ DataInputDeserializer view = new DataInputDeserializer(serialized);
+ return new LocalCommittable(
+ partSerializer.deserialize(view),
+ view.readInt(),
+ new Increment(
+ deserializeFiles(view), deserializeFiles(view), deserializeFiles(view)));
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecord.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecord.java
new file mode 100644
index 0000000..dae6e2f
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecord.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.types.RowKind;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A sink records contains key, value and partition, bucket, row kind information. */
+public class SinkRecord {
+
+ private final BinaryRowData partition;
+
+ private final int bucket;
+
+ private final RowKind rowKind;
+
+ private final BinaryRowData key;
+
+ private final RowData row;
+
+ public SinkRecord(
+ BinaryRowData partition, int bucket, RowKind rowKind, BinaryRowData key, RowData row) {
+ checkArgument(partition.getRowKind() == RowKind.INSERT);
+ checkArgument(key.getRowKind() == RowKind.INSERT);
+ checkArgument(row.getRowKind() == RowKind.INSERT);
+ this.partition = partition;
+ this.bucket = bucket;
+ this.rowKind = rowKind;
+ this.key = key;
+ this.row = row;
+ }
+
+ public BinaryRowData partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public BinaryRowData key() {
+ return key;
+ }
+
+ public RowData row() {
+ return row;
+ }
+
+ public RowKind rowKind() {
+ return rowKind;
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecordConverter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecordConverter.java
new file mode 100644
index 0000000..b5302d6
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecordConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.connector.utils.ProjectionUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+/** Converter for converting {@link RowData} to {@link SinkRecord}. */
+public class SinkRecordConverter {
+
+ private final int numBucket;
+
+ private final RowDataSerializer rowSerializer;
+
+ private final Projection<RowData, BinaryRowData> partProjection;
+
+ private final Projection<RowData, BinaryRowData> keyProjection;
+
+ public SinkRecordConverter(int numBucket, RowType inputType, int[] partitions, int[] keys) {
+ this.numBucket = numBucket;
+ this.rowSerializer = new RowDataSerializer(inputType);
+ this.partProjection = ProjectionUtils.newProjection(inputType, partitions);
+ this.keyProjection = ProjectionUtils.newProjection(inputType, keys);
+ }
+
+ public SinkRecord convert(RowData row) {
+ RowKind rowKind = row.getRowKind();
+ row.setRowKind(RowKind.INSERT);
+ BinaryRowData partition = partProjection.apply(row);
+ BinaryRowData key = keyProjection.apply(row);
+ int hash = key.getArity() == 0 ? rowSerializer.toBinaryRow(row).hashCode() : key.hashCode();
+ int bucket = Math.abs(hash % numBucket);
+ return new SinkRecord(partition, bucket, rowKind, key, row);
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
new file mode 100644
index 0000000..e5dde92
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** {@link GlobalCommitter} for dynamic store. */
+public class StoreGlobalCommitter
+ implements GlobalCommitter<LocalCommittable, ManifestCommittable> {
+
+ private final FileStoreCommit fileStoreCommit;
+
+ private final FileStoreExpire fileStoreExpire;
+
+ @Nullable private final CatalogLock lock;
+
+ @Nullable private final Map<String, String> overwritePartition;
+
+ public StoreGlobalCommitter(
+ FileStoreCommit fileStoreCommit,
+ FileStoreExpire fileStoreExpire,
+ @Nullable CatalogLock lock,
+ @Nullable Map<String, String> overwritePartition) {
+ this.fileStoreCommit = fileStoreCommit;
+ this.fileStoreExpire = fileStoreExpire;
+ this.lock = lock;
+ this.overwritePartition = overwritePartition;
+ }
+
+ @Override
+ public List<ManifestCommittable> filterRecoveredCommittables(
+ List<ManifestCommittable> globalCommittables) {
+ return fileStoreCommit.filterCommitted(globalCommittables);
+ }
+
+ @Override
+ public ManifestCommittable combine(List<LocalCommittable> committables) {
+ ManifestCommittable globalCommittable = new ManifestCommittable();
+ committables.forEach(
+ committable ->
+ globalCommittable.add(
+ committable.partition(),
+ committable.bucket(),
+ committable.increment()));
+ return globalCommittable;
+ }
+
+ @Override
+ public List<ManifestCommittable> commit(List<ManifestCommittable> globalCommittables) {
+ Map<String, String> properties = new HashMap<>();
+ if (overwritePartition == null) {
+ for (ManifestCommittable committable : globalCommittables) {
+ fileStoreCommit.commit(committable, properties);
+ }
+ } else {
+ checkArgument(
+ globalCommittables.size() == 1, "overwrite is only supported in batch mode.");
+ fileStoreCommit.overwrite(overwritePartition, globalCommittables.get(0), properties);
+ }
+ fileStoreExpire.expire();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void endOfInput() {}
+
+ @Override
+ public void close() throws Exception {
+ if (lock != null) {
+ lock.close();
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
new file mode 100644
index 0000000..123f073
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.connector.utils.ProjectionUtils.project;
+
+/** {@link Sink} of dynamic store. */
+public class StoreSink implements Sink<RowData, LocalCommittable, Void, ManifestCommittable> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ObjectIdentifier tableIdentifier;
+
+ private final FileStore fileStore;
+
+ private final RowType rowType;
+
+ private final int[] partitions;
+
+ private final int[] keys;
+
+ private final int numBucket;
+
+ @Nullable private final CatalogLock.Factory lockFactory;
+
+ @Nullable private final Map<String, String> overwritePartition;
+
+ public StoreSink(
+ ObjectIdentifier tableIdentifier,
+ FileStore fileStore,
+ RowType rowType,
+ int[] partitions,
+ int[] keys,
+ int numBucket,
+ @Nullable CatalogLock.Factory lockFactory,
+ @Nullable Map<String, String> overwritePartition) {
+ this.tableIdentifier = tableIdentifier;
+ this.fileStore = fileStore;
+ this.rowType = rowType;
+ this.partitions = partitions;
+ this.keys = keys;
+ this.numBucket = numBucket;
+ this.lockFactory = lockFactory;
+ this.overwritePartition = overwritePartition;
+ }
+
+ @Override
+ public StoreSinkWriter createWriter(InitContext initContext, List<Void> list) {
+ SinkRecordConverter recordConverter =
+ new SinkRecordConverter(numBucket, rowType, partitions, keys);
+ return new StoreSinkWriter(
+ fileStore.newWrite(), recordConverter, overwritePartition != null);
+ }
+
+ @Override
+ public Optional<SimpleVersionedSerializer<Void>> getWriterStateSerializer() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<Committer<LocalCommittable>> createCommitter() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalCommitter<LocalCommittable, ManifestCommittable>>
+ createGlobalCommitter() {
+ FileStoreCommit commit = fileStore.newCommit();
+ CatalogLock lock;
+ if (lockFactory == null) {
+ lock = null;
+ } else {
+ lock = lockFactory.create();
+ commit.withLock(
+ new Lock() {
+ @Override
+ public <T> T runWithLock(Callable<T> callable) throws Exception {
+ return lock.runWithLock(
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName(),
+ callable);
+ }
+ });
+ }
+ return Optional.of(
+ new StoreGlobalCommitter(commit, fileStore.newExpire(), lock, overwritePartition));
+ }
+
+ @Override
+ public Optional<SimpleVersionedSerializer<LocalCommittable>> getCommittableSerializer() {
+ return Optional.of(
+ new LocalCommittableSerializer(
+ project(rowType, partitions), project(rowType, keys), rowType));
+ }
+
+ @Override
+ public Optional<SimpleVersionedSerializer<ManifestCommittable>>
+ getGlobalCommittableSerializer() {
+ return Optional.of(
+ new ManifestCommittableSerializer(
+ project(rowType, partitions), project(rowType, keys), rowType));
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
new file mode 100644
index 0000000..453b245
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** A {@link SinkWriter} for dynamic store. */
+public class StoreSinkWriter implements SinkWriter<RowData, LocalCommittable, Void> {
+
+ private final FileStoreWrite fileStoreWrite;
+
+ private final SinkRecordConverter recordConverter;
+
+ private final boolean overwrite;
+
+ private final ExecutorService compactExecutor;
+
+ private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
+
+ public StoreSinkWriter(
+ FileStoreWrite fileStoreWrite, SinkRecordConverter recordConverter, boolean overwrite) {
+ this.fileStoreWrite = fileStoreWrite;
+ this.recordConverter = recordConverter;
+ this.overwrite = overwrite;
+ this.compactExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.writers = new HashMap<>();
+ }
+
+ private RecordWriter getWriter(BinaryRowData partition, int bucket) {
+ Map<Integer, RecordWriter> buckets = writers.get(partition);
+ if (buckets == null) {
+ buckets = new HashMap<>();
+ writers.put(partition.copy(), buckets);
+ }
+ return buckets.computeIfAbsent(
+ bucket,
+ k ->
+ overwrite
+ ? fileStoreWrite.createEmptyWriter(
+ partition, bucket, compactExecutor)
+ : fileStoreWrite.createWriter(partition, bucket, compactExecutor));
+ }
+
+ @Override
+ public void write(RowData rowData, Context context) throws IOException {
+ RowKind rowKind = rowData.getRowKind();
+ SinkRecord record = recordConverter.convert(rowData);
+ RecordWriter writer = getWriter(record.partition(), record.bucket());
+ try {
+ writeToFileStore(writer, record);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ rowData.setRowKind(rowKind);
+ }
+
+ private void writeToFileStore(RecordWriter writer, SinkRecord record) throws Exception {
+ switch (record.rowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ if (record.key().getArity() == 0) {
+ writer.write(ValueKind.ADD, record.row(), GenericRowData.of(1));
+ } else {
+ writer.write(ValueKind.ADD, record.key(), record.row());
+ }
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ if (record.key().getArity() == 0) {
+ writer.write(ValueKind.ADD, record.row(), GenericRowData.of(-1));
+ } else {
+ writer.write(ValueKind.DELETE, record.key(), record.row());
+ }
+ break;
+ }
+ }
+
+ @Override
+ public List<LocalCommittable> prepareCommit(boolean flush) throws IOException {
+ try {
+ return prepareCommit();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private List<LocalCommittable> prepareCommit() throws Exception {
+ List<LocalCommittable> committables = new ArrayList<>();
+ for (BinaryRowData partition : writers.keySet()) {
+ Map<Integer, RecordWriter> buckets = writers.get(partition);
+ for (Integer bucket : buckets.keySet()) {
+ RecordWriter writer = buckets.get(bucket);
+ LocalCommittable committable =
+ new LocalCommittable(partition, bucket, writer.prepareCommit());
+ committables.add(committable);
+
+ // clear if no update
+ // we need a mechanism to clear writers, otherwise there will be more and more
+ // such as yesterday's partition that no longer needs to be written.
+ if (committable.increment().newFiles().isEmpty()) {
+ closeWriter(writer);
+ buckets.remove(bucket);
+ }
+ }
+
+ if (buckets.isEmpty()) {
+ writers.remove(partition);
+ }
+ }
+ return committables;
+ }
+
+ private void closeWriter(RecordWriter writer) throws Exception {
+ writer.sync();
+ writer.close();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.compactExecutor.shutdownNow();
+ for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
+ for (RecordWriter writer : bucketWriters.values()) {
+ closeWriter(writer);
+ }
+ }
+ writers.clear();
+ }
+
+ @VisibleForTesting
+ Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
+ return writers;
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/ProjectionUtils.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/ProjectionUtils.java
new file mode 100644
index 0000000..5c135b8
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/ProjectionUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.utils;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utils for {@link Projection}. */
+public class ProjectionUtils {
+
+ public static final Projection<RowData, BinaryRowData> EMPTY_PROJECTION =
+ input -> BinaryRowDataUtil.EMPTY_ROW;
+
+ public static RowType project(RowType inputType, int[] mapping) {
+ List<RowType.RowField> fields = inputType.getFields();
+ return new RowType(
+ Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
+ }
+
+ public static Projection<RowData, BinaryRowData> newProjection(
+ RowType inputType, int[] mapping) {
+ if (mapping.length == 0) {
+ return EMPTY_PROJECTION;
+ }
+
+ @SuppressWarnings("unchecked")
+ Projection<RowData, BinaryRowData> projection =
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(new TableConfig()),
+ "Projection",
+ inputType,
+ project(inputType, mapping),
+ mapping)
+ .newInstance(Thread.currentThread().getContextClassLoader());
+ return projection;
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
new file mode 100644
index 0000000..6fd2ac3
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement;
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LocalCommittableSerializer}. */
+public class LocalCommittableSerializerTest {
+
+ @Test
+ public void test() throws IOException {
+ LocalCommittableSerializer serializer =
+ new LocalCommittableSerializer(
+ RowType.of(new IntType()),
+ RowType.of(new IntType()),
+ RowType.of(new IntType()));
+ Increment increment = randomIncrement();
+ LocalCommittable committable = new LocalCommittable(row(0), 1, increment);
+ LocalCommittable newCommittable =
+ serializer.deserialize(1, serializer.serialize(committable));
+ assertThat(newCommittable).isEqualTo(committable);
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
new file mode 100644
index 0000000..e69ec17
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link StoreSink}. */
+public class StoreSinkTest {
+
+ private final ObjectIdentifier identifier =
+ ObjectIdentifier.of("my_catalog", "my_database", "my_table");
+
+ private final TestFileStore fileStore = new TestFileStore();
+
+ private final TestLock lock = new TestLock();
+
+ private final RowType rowType = RowType.of(new IntType(), new IntType(), new IntType());
+
+ @Test
+ public void testChangelogs() throws Exception {
+ StoreSink sink = newSink(null);
+ writeAndCommit(
+ sink,
+ GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
+ GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
+ GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 7, 5),
+ GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
+ assertThat(fileStore.committedFiles.get(row(1)).get(1))
+ .isEqualTo(Collections.singletonList("DELETE-key-0-value-1/0/1"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(0))
+ .isEqualTo(Collections.singletonList("DELETE-key-2-value-0/2/3"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(1))
+ .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+ }
+
+ @Test
+ public void testNoKeyChangelogs() throws Exception {
+ StoreSink sink =
+ new StoreSink(
+ identifier,
+ fileStore,
+ rowType,
+ new int[] {0},
+ new int[] {},
+ 2,
+ () -> lock,
+ new HashMap<>());
+ writeAndCommit(
+ sink,
+ GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
+ GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
+ GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 4, 5),
+ GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
+ assertThat(fileStore.committedFiles.get(row(1)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-1/0/1-value--1"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-0/4/5-value-1"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(1))
+ .isEqualTo(Arrays.asList("ADD-key-0/0/1-value-1", "ADD-key-0/2/3-value--1"));
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ StoreSink sink = newSink(null);
+ writeAndAssert(sink);
+
+ writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
+ assertThat(fileStore.committedFiles.get(row(1)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(0))
+ .isEqualTo(Arrays.asList("ADD-key-2-value-0/2/3", "ADD-key-8-value-0/8/9"));
+ }
+
+ @Test
+ public void testOverwrite() throws Exception {
+ StoreSink sink = newSink(new HashMap<>());
+ writeAndAssert(sink);
+
+ writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
+ assertThat(fileStore.committedFiles.get(row(1)).get(1)).isNull();
+ assertThat(fileStore.committedFiles.get(row(1)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+ }
+
+ @Test
+ public void testOverwritePartition() throws Exception {
+ HashMap<String, String> partition = new HashMap<>();
+ partition.put("part", "0");
+ StoreSink sink = newSink(partition);
+ writeAndAssert(sink);
+
+ writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
+ assertThat(fileStore.committedFiles.get(row(1)).get(1))
+ .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+ assertThat(fileStore.committedFiles.get(row(1)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+ }
+
+ private void writeAndAssert(StoreSink sink) throws Exception {
+ writeAndCommit(
+ sink,
+ GenericRowData.of(0, 0, 1),
+ GenericRowData.of(0, 2, 3),
+ GenericRowData.of(0, 7, 5),
+ GenericRowData.of(1, 0, 1));
+ assertThat(fileStore.committedFiles.get(row(1)).get(1))
+ .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(0))
+ .isEqualTo(Collections.singletonList("ADD-key-2-value-0/2/3"));
+ assertThat(fileStore.committedFiles.get(row(0)).get(1))
+ .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+ }
+
+ private void writeAndCommit(StoreSink sink, RowData... rows) throws Exception {
+ commit(sink, write(sink, rows));
+ }
+
+ private List<LocalCommittable> write(StoreSink sink, RowData... rows) throws Exception {
+ StoreSinkWriter writer = sink.createWriter(null, null);
+ for (RowData row : rows) {
+ writer.write(row, null);
+ }
+
+ List<LocalCommittable> localCommittables = writer.prepareCommit(true);
+ Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>(writer.writers());
+ assertThat(writers.size()).isGreaterThan(0);
+
+ writer.close();
+ writers.forEach(
+ (part, map) ->
+ map.forEach(
+ (bucket, recordWriter) -> {
+ TestRecordWriter testWriter = (TestRecordWriter) recordWriter;
+ assertThat(testWriter.synced).isTrue();
+ assertThat(testWriter.closed).isTrue();
+ }));
+ return localCommittables;
+ }
+
+ private void commit(StoreSink sink, List<LocalCommittable> localCommittables) throws Exception {
+ StoreGlobalCommitter committer = (StoreGlobalCommitter) sink.createGlobalCommitter().get();
+ ManifestCommittable committable = committer.combine(localCommittables);
+
+ fileStore.expired = false;
+ lock.locked = false;
+ committer.commit(Collections.singletonList(committable));
+ assertThat(fileStore.expired).isTrue();
+ assertThat(lock.locked).isTrue();
+
+ assertThat(
+ committer
+ .filterRecoveredCommittables(Collections.singletonList(committable))
+ .size())
+ .isEqualTo(0);
+
+ lock.closed = false;
+ committer.close();
+ assertThat(lock.closed).isTrue();
+ }
+
+ private StoreSink newSink(Map<String, String> overwritePartition) {
+ return new StoreSink(
+ identifier,
+ fileStore,
+ rowType,
+ new int[] {0},
+ new int[] {1},
+ 2,
+ () -> lock,
+ overwritePartition);
+ }
+
+ private class TestLock implements CatalogLock {
+
+ private boolean locked = false;
+
+ private boolean closed = false;
+
+ @Override
+ public <T> T runWithLock(String database, String table, Callable<T> callable)
+ throws Exception {
+ assertThat(database).isEqualTo(identifier.getDatabaseName());
+ assertThat(table).isEqualTo(identifier.getObjectName());
+ locked = true;
+ return callable.call();
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
new file mode 100644
index 0000000..febe35d
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+
+/** Test {@link FileStore}. */
+public class TestFileStore implements FileStore {
+
+ public final Set<ManifestCommittable> committed = new HashSet<>();
+
+ public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles = new HashMap<>();
+
+ public boolean expired = false;
+
+ @Override
+ public FileStoreWrite newWrite() {
+ return new FileStoreWrite() {
+ @Override
+ public RecordWriter createWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+ TestRecordWriter writer = new TestRecordWriter();
+ writer.records.addAll(
+ committedFiles
+ .computeIfAbsent(partition, k -> new HashMap<>())
+ .computeIfAbsent(bucket, k -> new ArrayList<>()));
+ committedFiles.get(partition).remove(bucket);
+ return writer;
+ }
+
+ @Override
+ public RecordWriter createEmptyWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+ return new TestRecordWriter();
+ }
+ };
+ }
+
+ @Override
+ public FileStoreCommit newCommit() {
+ return new TestCommit();
+ }
+
+ @Override
+ public FileStoreExpire newExpire() {
+ return () -> expired = true;
+ }
+
+ @Override
+ public FileStoreScan newScan() {
+ throw new UnsupportedOperationException();
+ }
+
+ static class TestRecordWriter implements RecordWriter {
+
+ final List<String> records = new ArrayList<>();
+
+ boolean synced = false;
+
+ boolean closed = false;
+
+ private String rowToString(RowData row) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < row.getArity(); i++) {
+ if (i != 0) {
+ builder.append("/");
+ }
+ builder.append(row.getInt(i));
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public void write(ValueKind valueKind, RowData key, RowData value) {
+ records.add(
+ valueKind.toString()
+ + "-key-"
+ + rowToString(key)
+ + "-value-"
+ + rowToString(value));
+ }
+
+ @Override
+ public Increment prepareCommit() {
+ List<SstFileMeta> newFiles =
+ records.stream()
+ .map(s -> new SstFileMeta(s, 0, 0, null, null, null, 0, 0, 0))
+ .collect(Collectors.toList());
+ return new Increment(newFiles, Collections.emptyList(), Collections.emptyList());
+ }
+
+ @Override
+ public void sync() {
+ synced = true;
+ }
+
+ @Override
+ public List<SstFileMeta> close() {
+ closed = true;
+ return Collections.emptyList();
+ }
+ }
+
+ class TestCommit implements FileStoreCommit {
+
+ Lock lock;
+
+ @Override
+ public FileStoreCommit withLock(Lock lock) {
+ this.lock = lock;
+ return this;
+ }
+
+ @Override
+ public List<ManifestCommittable> filterCommitted(
+ List<ManifestCommittable> committableList) {
+ return committableList.stream()
+ .filter(c -> !committed.contains(c))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void commit(ManifestCommittable committable, Map<String, String> properties) {
+ try {
+ lock.runWithLock(() -> committed.add(committable));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ committable
+ .newFiles()
+ .forEach(
+ (part, bMap) ->
+ bMap.forEach(
+ (bucket, files) -> {
+ List<String> committed =
+ committedFiles
+ .computeIfAbsent(
+ part, k -> new HashMap<>())
+ .computeIfAbsent(
+ bucket,
+ k -> new ArrayList<>());
+ files.stream()
+ .map(SstFileMeta::fileName)
+ .forEach(committed::add);
+ }));
+ }
+
+ @Override
+ public void overwrite(
+ Map<String, String> partition,
+ ManifestCommittable committable,
+ Map<String, String> properties) {
+ if (partition.isEmpty()) {
+ committedFiles.clear();
+ } else {
+ BinaryRowData partRow = row(Integer.parseInt(partition.get("part")));
+ committedFiles.remove(partRow);
+ }
+ commit(committable, properties);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index c999fc0..65e18b4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -41,10 +41,9 @@ public class ManifestCommittableSerializer
private final BinaryRowDataSerializer partSerializer;
private final SstFileMetaSerializer sstSerializer;
- public ManifestCommittableSerializer(
- RowType partitionType, SstFileMetaSerializer sstSerializer) {
+ public ManifestCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
- this.sstSerializer = sstSerializer;
+ this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index c8afffe..0e29652 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
/**
* Incremental files for merge tree. It consists of two parts:
@@ -60,4 +61,23 @@ public class Increment {
public List<SstFileMeta> compactAfter() {
return compactAfter;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Increment increment = (Increment) o;
+ return Objects.equals(newFiles, increment.newFiles)
+ && Objects.equals(compactBefore, increment.compactBefore)
+ && Objects.equals(compactAfter, increment.compactAfter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(newFiles, compactBefore, compactAfter);
+ }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 2fe3241..4ebbf85 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.manifest;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -38,14 +37,15 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link ManifestCommittableSerializer}. */
public class ManifestCommittableSerializerTest {
- private final AtomicInteger id = new AtomicInteger();
+ private static final AtomicInteger ID = new AtomicInteger();
@Test
public void testCommittableSerDe() throws IOException {
- SstFileMetaSerializer sstSerializer =
- new SstFileMetaSerializer(RowType.of(new IntType()), RowType.of(new IntType()));
ManifestCommittableSerializer serializer =
- new ManifestCommittableSerializer(RowType.of(new IntType()), sstSerializer);
+ new ManifestCommittableSerializer(
+ RowType.of(new IntType()),
+ RowType.of(new IntType()),
+ RowType.of(new IntType()));
ManifestCommittable committable = new ManifestCommittable();
addAndAssert(committable, row(0), 0);
addAndAssert(committable, row(0), 1);
@@ -57,7 +57,7 @@ public class ManifestCommittableSerializerTest {
private void addAndAssert(
ManifestCommittable committable, BinaryRowData partition, int bucket) {
- Increment increment = newIncrement();
+ Increment increment = randomIncrement();
committable.add(partition, bucket, increment);
assertThat(committable.newFiles().get(partition).get(bucket))
.isEqualTo(increment.newFiles());
@@ -67,11 +67,11 @@ public class ManifestCommittableSerializerTest {
.isEqualTo(increment.compactAfter());
}
- private Increment newIncrement() {
+ public static Increment randomIncrement() {
return new Increment(
- Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
- Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
- Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)));
+ Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
+ Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
+ Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)));
}
public static SstFileMeta newFile(int name, int level) {
diff --git a/pom.xml b/pom.xml
index 5443b52..5046318 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@ under the License.
<modules>
<module>flink-table-store-core</module>
+ <module>flink-table-store-connector</module>
</modules>
<properties>