You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/16 03:46:03 UTC
[flink-table-store] branch master updated: [FLINK-25994] Implement FileStoreExpire
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 6ec208b [FLINK-25994] Implement FileStoreExpire
6ec208b is described below
commit 6ec208bcc180471f141cfa250b92adc6aa95c2b2
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Feb 16 11:45:59 2022 +0800
[FLINK-25994] Implement FileStoreExpire
This closes #17
---
.../apache/flink/table/store/file/Snapshot.java | 13 +-
.../store/file/operation/FileStoreCommitImpl.java | 8 +-
.../store/file/operation/FileStoreExpireImpl.java | 166 ++++++++++++++++
.../store/file/utils/FileStorePathFactory.java | 24 +++
.../store/file/operation/FileStoreExpireTest.java | 216 +++++++++++++++++++++
.../store/file/operation/FileStoreScanTest.java | 53 +----
.../store/file/operation/OperationTestUtils.java | 88 ++++++++-
7 files changed, 516 insertions(+), 52 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 77f79e6..e02fa0d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -39,6 +39,7 @@ public class Snapshot {
private static final String FIELD_COMMIT_USER = "commitUser";
private static final String FIELD_COMMIT_DIGEST = "commitDigest";
private static final String FIELD_COMMIT_KIND = "commitKind";
+ private static final String FIELD_TIME_MILLIS = "timeMillis";
@JsonProperty(FIELD_ID)
private final long id;
@@ -56,18 +57,23 @@ public class Snapshot {
@JsonProperty(FIELD_COMMIT_KIND)
private final CommitKind commitKind;
+ @JsonProperty(FIELD_TIME_MILLIS)
+ private final long timeMillis;
+
@JsonCreator
public Snapshot(
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_DIGEST) String commitDigest,
- @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind) {
+ @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
+ @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
this.id = id;
this.manifestList = manifestList;
this.commitUser = commitUser;
this.commitDigest = commitDigest;
this.commitKind = commitKind;
+ this.timeMillis = timeMillis;
}
@JsonGetter(FIELD_ID)
@@ -95,6 +101,11 @@ public class Snapshot {
return commitKind;
}
+ @JsonGetter(FIELD_TIME_MILLIS)
+ public long timeMillis() {
+ return timeMillis;
+ }
+
public String toJson() {
try {
return new ObjectMapper().writeValueAsString(this);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 465c2ec..7c56acd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -243,7 +243,13 @@ public class FileStoreCommitImpl implements FileStoreCommit {
// prepare snapshot file
manifestListName = manifestList.write(newMetas);
newSnapshot =
- new Snapshot(newSnapshotId, manifestListName, commitUser, hash, commitKind);
+ new Snapshot(
+ newSnapshotId,
+ manifestListName,
+ commitUser,
+ hash,
+ commitKind,
+ System.currentTimeMillis());
FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
new file mode 100644
index 0000000..e7a03d9
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default implementation of {@link FileStoreExpire}. It retains a certain number or period of
+ * latest snapshots.
+ *
+ * <p>NOTE: This implementation will keep at least one snapshot so that users will not accidentally
+ * clear all snapshots.
+ */
+public class FileStoreExpireImpl implements FileStoreExpire {
+
+ // snapshots exceeding any constraint will be expired
+ private final int numRetained;
+ private final long millisRetained;
+
+ private final FileStorePathFactory pathFactory;
+ private final ManifestList manifestList;
+ private final FileStoreScan scan;
+
+ public FileStoreExpireImpl(
+ int numRetained,
+ long millisRetained,
+ FileStorePathFactory pathFactory,
+ ManifestList.Factory manifestListFactory,
+ FileStoreScan scan) {
+ this.numRetained = numRetained;
+ this.millisRetained = millisRetained;
+ this.pathFactory = pathFactory;
+ this.manifestList = manifestListFactory.create();
+ this.scan = scan;
+ }
+
+ @Override
+ public void expire() {
+ Long latestSnapshotId = pathFactory.latestSnapshotId();
+ if (latestSnapshotId == null) {
+ // no snapshot, nothing to expire
+ return;
+ }
+
+ long currentMillis = System.currentTimeMillis();
+
+ // find earliest snapshot to retain
+ for (long id = Math.max(latestSnapshotId - numRetained + 1, Snapshot.FIRST_SNAPSHOT_ID);
+ id <= latestSnapshotId;
+ id++) {
+ Path snapshotPath = pathFactory.toSnapshotPath(id);
+ try {
+ if (snapshotPath.getFileSystem().exists(snapshotPath)
+ && currentMillis - Snapshot.fromPath(snapshotPath).timeMillis()
+ <= millisRetained) {
+ // within time threshold, can assume that all snapshots after it are also within
+ // the threshold
+ expireUntil(id);
+ return;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to determine if snapshot #" + id + " still exists", e);
+ }
+ }
+
+ // no snapshot can be retained, expire all but last one
+ expireUntil(latestSnapshotId);
+ }
+
+ private void expireUntil(long exclusiveId) {
+ if (exclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
+ // fast exit
+ return;
+ }
+
+ Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(exclusiveId));
+
+ // if sst file is only used in snapshots to expire but not in next snapshot we can delete it
+ // because each sst file will only be added and deleted once
+ Set<Path> sstInUse = new HashSet<>();
+ FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
+ new FileStorePathFactory.SstPathFactoryCache(pathFactory);
+ for (ManifestEntry entry : scan.withSnapshot(exclusiveId).plan().files()) {
+ SstPathFactory sstPathFactory =
+ sstPathFactoryCache.getSstPathFactory(entry.partition(), entry.bucket());
+ sstInUse.add(sstPathFactory.toPath(entry.file().fileName()));
+ }
+
+ // the same with sst, manifests are only added and deleted once
+ Set<ManifestFileMeta> manifestsInUse =
+ new HashSet<>(manifestList.read(exclusiveSnapshot.manifestList()));
+
+ Set<Path> sstToDelete = new HashSet<>();
+ Set<String> manifestsToDelete = new HashSet<>();
+
+ for (long id = exclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+ Path snapshotPath = pathFactory.toSnapshotPath(id);
+ try {
+ if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
+ // only latest snapshots are retained, as we cannot find this snapshot, we can
+ // assume that all snapshots preceding it have been removed
+ break;
+ }
+
+ Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+
+ for (ManifestEntry entry : scan.withSnapshot(toExpire.id()).plan().files()) {
+ SstPathFactory sstPathFactory =
+ sstPathFactoryCache.getSstPathFactory(
+ entry.partition(), entry.bucket());
+ Path sstPath = sstPathFactory.toPath(entry.file().fileName());
+ if (!sstInUse.contains(sstPath)) {
+ sstToDelete.add(sstPath);
+ }
+ }
+
+ for (ManifestFileMeta manifest : manifestList.read(toExpire.manifestList())) {
+ if (!manifestsInUse.contains(manifest)) {
+ manifestsToDelete.add(manifest.fileName());
+ }
+ }
+
+ manifestList.delete(toExpire.manifestList());
+ FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to determine if snapshot #" + id + " still exists", e);
+ }
+ }
+
+ for (Path sst : sstToDelete) {
+ FileUtils.deleteOrWarn(sst);
+ }
+ for (String manifestName : manifestsToDelete) {
+ FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(manifestName));
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 679e367..ab6a384 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -40,6 +40,8 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -152,4 +154,26 @@ public class FileStorePathFactory {
public String uuid() {
return uuid;
}
+
+ /** Cache for storing {@link SstPathFactory}s. */
+ public static class SstPathFactoryCache {
+
+ private final FileStorePathFactory pathFactory;
+ private final Map<BinaryRowData, Map<Integer, SstPathFactory>> cache;
+
+ public SstPathFactoryCache(FileStorePathFactory pathFactory) {
+ this.pathFactory = pathFactory;
+ this.cache = new HashMap<>();
+ }
+
+ public SstPathFactory getSstPathFactory(BinaryRowData partition, int bucket) {
+ return cache.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
+ .compute(
+ bucket,
+ (b, f) ->
+ f == null
+ ? pathFactory.createSstPathFactory(partition, bucket)
+ : f);
+ }
+ }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
new file mode 100644
index 0000000..16a7ab1
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreExpireImpl}. */
+public class FileStoreExpireTest {
+
+ private final FileFormat avro =
+ FileFormat.fromIdentifier(
+ FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration());
+
+ private TestKeyValueGenerator gen;
+ @TempDir java.nio.file.Path tempDir;
+ private FileStorePathFactory pathFactory;
+
+ @BeforeEach
+ public void beforeEach() throws IOException {
+ gen = new TestKeyValueGenerator();
+ pathFactory =
+ OperationTestUtils.createPathFactory(
+ TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+ Path root = new Path(tempDir.toString());
+ root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+ }
+
+ @Test
+ public void testNoSnapshot() {
+ FileStoreExpire expire =
+ OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+ expire.expire();
+
+ assertThat(pathFactory.latestSnapshotId()).isNull();
+ }
+
+ @Test
+ public void testNotEnoughSnapshots() throws Exception {
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ commit(2, allData, snapshotPositions);
+ int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ FileStoreExpire expire =
+ OperationTestUtils.createExpire(
+ latestSnapshotId + 1, Long.MAX_VALUE, avro, pathFactory);
+ expire.expire();
+
+ FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+ for (int i = 1; i <= latestSnapshotId; i++) {
+ assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+ assertSnapshot(i, allData, snapshotPositions);
+ }
+ }
+
+ @Test
+ public void testNeverExpire() throws Exception {
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ commit(5, allData, snapshotPositions);
+ int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ FileStoreExpire expire =
+ OperationTestUtils.createExpire(
+ Integer.MAX_VALUE, Long.MAX_VALUE, avro, pathFactory);
+ expire.expire();
+
+ FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+ for (int i = 1; i <= latestSnapshotId; i++) {
+ assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+ assertSnapshot(i, allData, snapshotPositions);
+ }
+ }
+
+ @Test
+ public void testKeepAtLeastOneSnapshot() throws Exception {
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ commit(3, allData, snapshotPositions);
+ int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ Thread.sleep(100);
+ FileStoreExpire expire =
+ OperationTestUtils.createExpire(Integer.MAX_VALUE, 1, avro, pathFactory);
+ expire.expire();
+
+ FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+ for (int i = 1; i < latestSnapshotId; i++) {
+ assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isFalse();
+ }
+ assertThat(fs.exists(pathFactory.toSnapshotPath(latestSnapshotId))).isTrue();
+ assertSnapshot(latestSnapshotId, allData, snapshotPositions);
+ }
+
+ @Test
+ public void testExpireWithNumber() throws Exception {
+ FileStoreExpire expire =
+ OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ commit(ThreadLocalRandom.current().nextInt(5) + 1, allData, snapshotPositions);
+ expire.expire();
+
+ int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+ for (int j = 1; j <= latestSnapshotId; j++) {
+ if (j > latestSnapshotId - 3) {
+ assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isTrue();
+ assertSnapshot(j, allData, snapshotPositions);
+ } else {
+ assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isFalse();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testExpireWithTime() throws Exception {
+ FileStoreExpire expire =
+ OperationTestUtils.createExpire(Integer.MAX_VALUE, 1000, avro, pathFactory);
+
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ commit(5, allData, snapshotPositions);
+ Thread.sleep(1500);
+ commit(5, allData, snapshotPositions);
+ long expireMillis = System.currentTimeMillis();
+ // expire twice to check for idempotence
+ expire.expire();
+ expire.expire();
+
+ int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+ for (int i = 1; i <= latestSnapshotId; i++) {
+ Path snapshotPath = pathFactory.toSnapshotPath(i);
+ if (fs.exists(snapshotPath)) {
+ assertThat(Snapshot.fromPath(snapshotPath).timeMillis())
+ .isBetween(expireMillis - 1000, expireMillis);
+ assertSnapshot(i, allData, snapshotPositions);
+ }
+ }
+ }
+
+ private void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions)
+ throws Exception {
+ for (int i = 0; i < numCommits; i++) {
+ int numRecords = ThreadLocalRandom.current().nextInt(100) + 1;
+ List<KeyValue> data = new ArrayList<>();
+ for (int j = 0; j < numRecords; j++) {
+ data.add(gen.next());
+ }
+ allData.addAll(data);
+ List<Snapshot> snapshots =
+ OperationTestUtils.writeAndCommitData(
+ data, gen::getPartition, kv -> 0, avro, pathFactory);
+ for (int j = 0; j < snapshots.size(); j++) {
+ snapshotPositions.add(allData.size());
+ }
+ }
+ }
+
+ private void assertSnapshot(
+ int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
+ throws Exception {
+ Map<BinaryRowData, BinaryRowData> expected =
+ OperationTestUtils.toKvMap(
+ allData.subList(0, snapshotPositions.get(snapshotId - 1)));
+ List<KeyValue> actualKvs =
+ OperationTestUtils.readKvsFromManifestEntries(
+ OperationTestUtils.createScan(avro, pathFactory)
+ .withSnapshot(snapshotId)
+ .plan()
+ .files(),
+ avro,
+ pathFactory);
+ gen.sort(actualKvs);
+ Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+ assertThat(actual).isEqualTo(expected);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 251c853..8a12007 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -25,12 +25,9 @@ import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
@@ -40,14 +37,10 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -205,48 +198,10 @@ public class FileStoreScanTest {
}
private Snapshot writeData(List<KeyValue> kvs) throws Exception {
- FileStoreWrite write = OperationTestUtils.createWrite(avro, pathFactory);
- Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
- for (KeyValue kv : kvs) {
- BinaryRowData partition = gen.getPartition(kv);
- int bucket = getBucket(kv);
- writers.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
- .compute(
- bucket,
- (b, w) -> {
- if (w == null) {
- ExecutorService service = Executors.newSingleThreadExecutor();
- return write.createWriter(partition, bucket, service);
- } else {
- return w;
- }
- })
- .write(kv.valueKind(), kv.key(), kv.value());
- }
-
- FileStoreCommit commit = OperationTestUtils.createCommit(avro, pathFactory);
- ManifestCommittable committable = new ManifestCommittable();
- for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
- writers.entrySet()) {
- for (Map.Entry<Integer, RecordWriter> entryWithBucket :
- entryWithPartition.getValue().entrySet()) {
- Increment increment = entryWithBucket.getValue().prepareCommit();
- committable.add(entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
- }
- }
- commit.commit(committable, Collections.emptyMap());
- writers.values().stream()
- .flatMap(m -> m.values().stream())
- .forEach(
- w -> {
- try {
- w.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- return Snapshot.fromPath(pathFactory.toSnapshotPath(pathFactory.latestSnapshotId()));
+ List<Snapshot> snapshots =
+ OperationTestUtils.writeAndCommitData(
+ kvs, gen::getPartition, this::getBucket, avro, pathFactory);
+ return snapshots.get(snapshots.size() - 1);
}
private int getBucket(KeyValue kv) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index bce303b..ef93d26 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -25,25 +25,33 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.MergeTreeFactory;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.mergetree.sst.SstFile;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
/** Utils for operation tests. */
public class OperationTestUtils {
@@ -106,11 +114,24 @@ public class OperationTestUtils {
new DeduplicateAccumulator(),
fileFormat,
pathFactory,
- OperationTestUtils.getMergeTreeOptions(false));
+ getMergeTreeOptions(false));
return new FileStoreWriteImpl(
pathFactory, mergeTreeFactory, createScan(fileFormat, pathFactory));
}
+ public static FileStoreExpire createExpire(
+ int numRetained,
+ long millisRetained,
+ FileFormat fileFormat,
+ FileStorePathFactory pathFactory) {
+ return new FileStoreExpireImpl(
+ numRetained,
+ millisRetained,
+ pathFactory,
+ createManifestListFactory(fileFormat, pathFactory),
+ createScan(fileFormat, pathFactory));
+ }
+
public static FileStorePathFactory createPathFactory(String scheme, String root) {
return new FileStorePathFactory(
new Path(scheme + "://" + root), TestKeyValueGenerator.PARTITION_TYPE, "default");
@@ -132,6 +153,71 @@ public class OperationTestUtils {
TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory);
}
+ public static List<Snapshot> writeAndCommitData(
+ List<KeyValue> kvs,
+ Function<KeyValue, BinaryRowData> partitionCalculator,
+ Function<KeyValue, Integer> bucketCalculator,
+ FileFormat fileFormat,
+ FileStorePathFactory pathFactory)
+ throws Exception {
+ FileStoreWrite write = createWrite(fileFormat, pathFactory);
+ Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
+ for (KeyValue kv : kvs) {
+ BinaryRowData partition = partitionCalculator.apply(kv);
+ int bucket = bucketCalculator.apply(kv);
+ writers.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
+ .compute(
+ bucket,
+ (b, w) -> {
+ if (w == null) {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ return write.createWriter(partition, bucket, service);
+ } else {
+ return w;
+ }
+ })
+ .write(kv.valueKind(), kv.key(), kv.value());
+ }
+
+ FileStoreCommit commit = createCommit(fileFormat, pathFactory);
+ ManifestCommittable committable = new ManifestCommittable();
+ for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
+ writers.entrySet()) {
+ for (Map.Entry<Integer, RecordWriter> entryWithBucket :
+ entryWithPartition.getValue().entrySet()) {
+ Increment increment = entryWithBucket.getValue().prepareCommit();
+ committable.add(entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
+ }
+ }
+
+ Long snapshotIdBeforeCommit = pathFactory.latestSnapshotId();
+ if (snapshotIdBeforeCommit == null) {
+ snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
+ }
+ commit.commit(committable, Collections.emptyMap());
+ Long snapshotIdAfterCommit = pathFactory.latestSnapshotId();
+ if (snapshotIdAfterCommit == null) {
+ snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
+ }
+
+ writers.values().stream()
+ .flatMap(m -> m.values().stream())
+ .forEach(
+ w -> {
+ try {
+ w.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<Snapshot> snapshots = new ArrayList<>();
+ for (long id = snapshotIdBeforeCommit + 1; id <= snapshotIdAfterCommit; id++) {
+ snapshots.add(Snapshot.fromPath(pathFactory.toSnapshotPath(id)));
+ }
+ return snapshots;
+ }
+
public static List<KeyValue> readKvsFromSnapshot(
long snapshotId, FileFormat fileFormat, FileStorePathFactory pathFactory)
throws IOException {