You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/24 03:18:45 UTC
[flink-table-store] branch master updated: [FLINK-28812] Support to read compacted snapshot only
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 638c34b3 [FLINK-28812] Support to read compacted snapshot only
638c34b3 is described below
commit 638c34b39ab0b57b14cef9b050c749de7b1bf7d3
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Nov 24 11:18:41 2022 +0800
[FLINK-28812] Support to read compacted snapshot only
This closes #351
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../store/connector/source/FileStoreSource.java | 5 +-
.../store/connector/BatchFileStoreITCase.java | 6 ++
.../connector/FullCompactionFileStoreITCase.java | 75 ++++++++++++++++++++++
.../org/apache/flink/table/store/CoreOptions.java | 10 +++
.../file/operation/AbstractFileStoreScan.java | 12 +++-
.../table/store/file/operation/FileStoreScan.java | 2 +
.../table/store/file/utils/SnapshotManager.java | 19 ++++++
.../table/store/table/source/DataTableScan.java | 5 ++
.../store/table/source/SnapshotEnumerator.java | 5 +-
.../table/store/table/FileStoreTableTestBase.java | 55 ++++++++++++++++
11 files changed, 197 insertions(+), 3 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index e6efa021..1f7401ad 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -200,6 +200,12 @@
<td>String</td>
<td>The default partition name in case the dynamic partition column value is null/empty string.</td>
</tr>
+ <tr>
+ <td><h5>read.compacted</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to read compacted snapshot only.</td>
+ </tr>
<tr>
<td><h5>sequence.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 573e5f98..33ead92a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -78,7 +78,10 @@ public class FileStoreSource extends FlinkSource {
Long snapshotId;
Collection<FileStoreSourceSplit> splits;
if (checkpoint == null) {
- DataFilePlan plan = isContinuous ? SnapshotEnumerator.startup(scan) : scan.plan();
+ DataFilePlan plan =
+ isContinuous
+ ? SnapshotEnumerator.startup(scan)
+ : scan.withReadCompacted(table.options().readCompacted()).plan();
snapshotId = plan.snapshotId;
splits = new FileStoreSourceSplitGenerator().createSplits(plan);
} else {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index 60a48760..6355859f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -43,4 +43,10 @@ public class BatchFileStoreITCase extends FileStoreTableITCase {
batchSql("INSERT OVERWRITE T SELECT * FROM T WHERE 1 <> 1");
assertThat(batchSql("SELECT * FROM T")).isEmpty();
}
+
+ @Test
+ public void testReadCompactedEmpty() {
+ batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */")).isEmpty();
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
new file mode 100644
index 00000000..b17f43c3
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** SQL ITCase for continuous file store. */
+public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
+ private final String table = "T";
+
+ @Override
+ protected List<String> ddl() {
+ String options =
+ " WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')";
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ + options);
+ }
+
+ @Test
+ public void testStreamingRead() throws Exception {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+ batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+ batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+ }
+
+ @Test
+ public void testReadCompacted() throws Exception {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM %s /*+ OPTIONS('read.compacted'='true') */", table));
+
+ batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+ batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */"))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 584bc2af..096d0b18 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -169,6 +169,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to skip compaction on write.");
+ public static final ConfigOption<Boolean> READ_COMPACTED =
+ ConfigOptions.key("read.compacted")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to read compacted snapshot only.");
+
public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
ConfigOptions.key("source.split.target-size")
.memoryType()
@@ -544,6 +550,10 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_COMPACTION_SKIP);
}
+ public boolean readCompacted() {
+ return options.get(READ_COMPACTED);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index bd8af8aa..551548bc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -75,6 +75,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private List<ManifestFileMeta> specifiedManifests = null;
private boolean isIncremental = false;
private Integer specifiedLevel = null;
+ private boolean readCompacted = false;
public AbstractFileStoreScan(
RowType partitionType,
@@ -175,13 +176,22 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withReadCompacted(boolean readCompacted) {
+ this.readCompacted = readCompacted;
+ return this;
+ }
+
@Override
public Plan plan() {
List<ManifestFileMeta> manifests = specifiedManifests;
Long snapshotId = specifiedSnapshotId;
if (manifests == null) {
if (snapshotId == null) {
- snapshotId = snapshotManager.latestSnapshotId();
+ snapshotId =
+ readCompacted
+ ? snapshotManager.latestCompactedSnapshotId()
+ : snapshotManager.latestSnapshotId();
}
if (snapshotId == null) {
manifests = Collections.emptyList();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index df3289a0..820baec0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -51,6 +51,8 @@ public interface FileStoreScan {
FileStoreScan withLevel(int level);
+ FileStoreScan withReadCompacted(boolean readCompacted);
+
/** Produce a {@link Plan}. */
Plan plan();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 515c32ed..8400a0cb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -90,6 +90,25 @@ public class SnapshotManager {
}
}
+ public @Nullable Long latestCompactedSnapshotId() {
+ Long latestId = latestSnapshotId();
+ Long earliestId = earliestSnapshotId();
+ if (latestId == null || earliestId == null) {
+ return null;
+ }
+
+ for (long snapshotId = latestId; snapshotId >= earliestId; snapshotId--) {
+ if (snapshotExists(snapshotId)) {
+ Snapshot snapshot = snapshot(snapshotId);
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ return snapshot.id();
+ }
+ }
+ }
+
+ return null;
+ }
+
/**
* Returns a snapshot earlier than the timestamp mills. A non-existent snapshot may be returned
* if all snapshots are later than the timestamp mills.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index e155ec1f..9cdda41b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -103,6 +103,11 @@ public abstract class DataTableScan implements TableScan {
return this;
}
+ public DataTableScan withReadCompacted(boolean readCompacted) {
+ scan.withReadCompacted(readCompacted);
+ return this;
+ }
+
@VisibleForTesting
public DataTableScan withBucket(int bucket) {
scan.withBucket(bucket);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
index 067ef812..e42dcbda 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
@@ -122,7 +122,10 @@ public class SnapshotEnumerator implements Callable<SnapshotEnumerator.Enumerato
if (options.changelogProducer() == FULL_COMPACTION) {
// Read the results of the last full compaction.
// Only full compaction results will appear on the max level.
- plan = scan.withLevel(options.numLevels() - 1).plan();
+ plan =
+ scan.withReadCompacted(options.readCompacted())
+ .withLevel(options.numLevels() - 1)
+ .plan();
} else {
plan = scan.plan();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 58c7d088..95a2ad2b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -42,6 +43,7 @@ import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.types.logical.LogicalType;
@@ -58,6 +60,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -316,6 +319,58 @@ public abstract class FileStoreTableTestBase {
}
}
+ @Test
+ public void testReadCompactedSnapshot() throws Exception {
+ writeCompactData();
+ FileStoreTable table = createFileStoreTable();
+
+ DataTableScan.DataFilePlan plan = table.newScan().withReadCompacted(true).plan();
+ Snapshot compactedSnapshot = table.snapshotManager().snapshot(plan.snapshotId);
+ Iterator<Snapshot> snapshotIterator = table.snapshotManager().snapshots();
+ while (snapshotIterator.hasNext()) {
+ Snapshot snapshot = snapshotIterator.next();
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id());
+ }
+ }
+
+ assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ List<Split> splits = plan.splits();
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING).size())
+ .isGreaterThan(0);
+ assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING).size())
+ .isGreaterThan(0);
+
+ List<Split> compactedSplits =
+ table.newScan().withSnapshot(compactedSnapshot.id()).plan().splits();
+ assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
+ .isEqualTo(getResult(read, compactedSplits, binaryRow(1), 0, BATCH_ROW_TO_STRING));
+ assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
+ .isEqualTo(getResult(read, compactedSplits, binaryRow(2), 0, BATCH_ROW_TO_STRING));
+ }
+
+ private void writeCompactData() throws Exception {
+ final int batchCount = 10;
+ final int countPerBatch = 10000;
+ FileStoreTable table =
+ createFileStoreTable(
+ conf ->
+ conf.set(CoreOptions.BUCKET, 1)
+ .set(CoreOptions.READ_COMPACTED, true)
+ .set(CoreOptions.COMPACTION_MAX_FILE_NUM, 10));
+ TableWrite write = table.newWrite("user");
+ TableCommit commit = table.newCommit("user");
+
+ for (int i = 0; i < batchCount; i++) {
+ for (int j = 0; j < countPerBatch; j++) {
+ write.write(rowData(j % 2 + 1, i * j, j * 10L));
+ }
+ commit.commit(i, write.prepareCommit(true, i));
+ }
+ write.close();
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,