You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/09 09:46:37 UTC
[flink-table-store] branch master updated: [FLINK-30333] Supports lookup a partial-update table
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e33f36ff [FLINK-30333] Supports lookup a partial-update table
e33f36ff is described below
commit e33f36ff07d13e55d26da81ebbc916d1fe0c2e15
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Dec 9 17:46:31 2022 +0800
[FLINK-30333] Supports lookup a partial-update table
This closes #430.
---
.../connector/lookup/FileStoreLookupFunction.java | 3 +
.../store/connector/source/FlinkSourceBuilder.java | 25 +-------
.../table/store/connector/LookupJoinITCase.java | 67 ++++++++++++++++++++++
.../org/apache/flink/table/store/CoreOptions.java | 4 ++
.../store/table/source/TableStreamingReader.java | 10 +---
.../ContinuousDataFileSnapshotEnumerator.java | 66 +++++++++++++++++----
6 files changed, 131 insertions(+), 44 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
index 4f565fe0..ff67dcbf 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.store.file.predicate.PredicateFilter;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.TableStreamingReader;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FileUtils;
@@ -85,6 +86,8 @@ public class FileStoreLookupFunction extends TableFunction<RowData> {
checkArgument(
schema.partitionKeys().isEmpty(), "Currently only support non-partitioned table.");
checkArgument(schema.primaryKeys().size() > 0, "Currently only support primary key table.");
+ ContinuousDataFileSnapshotEnumerator.validate(table.schema());
+
this.table = table;
// join keys are based on projection fields
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index b2495234..97217cee 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -25,29 +25,24 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.CoreOptions.MergeEngine;
import org.apache.flink.table.store.CoreOptions.StartupMode;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
-import java.util.HashMap;
import java.util.Optional;
-import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
-import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
/** Source builder to build a Flink {@link Source}. */
@@ -128,23 +123,7 @@ public class FlinkSourceBuilder {
private Source<RowData, ?, ?> buildSource() {
if (isContinuous) {
- // TODO move validation to a dedicated method
- MergeEngine mergeEngine = conf.get(MERGE_ENGINE);
- HashMap<MergeEngine, String> mergeEngineDesc =
- new HashMap<MergeEngine, String>() {
- {
- put(MergeEngine.PARTIAL_UPDATE, "Partial update");
- put(MergeEngine.AGGREGATE, "Pre-aggregate");
- }
- };
- if (table.schema().primaryKeys().size() > 0
- && mergeEngineDesc.containsKey(mergeEngine)
- && conf.get(CHANGELOG_PRODUCER) != FULL_COMPACTION) {
- throw new ValidationException(
- mergeEngineDesc.get(mergeEngine)
- + " continuous reading is not supported. "
- + "You can use full compaction changelog producer to support streaming reading.");
- }
+ ContinuousDataFileSnapshotEnumerator.validate(table.schema());
// TODO visit all options through CoreOptions
StartupMode startupMode = CoreOptions.startupMode(conf);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 9b38dbc3..a0f502b9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for lookup join. */
public class LookupJoinITCase extends AbstractTestBase {
@@ -118,6 +119,36 @@ public class LookupJoinITCase extends AbstractTestBase {
iterator.close();
}
+ @Test
+ public void testLookupWithLatest() throws Exception {
+ executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('scan.mode'='latest') */"
+ + " for system_time as of T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+ executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ List<Row> result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222),
+ Row.of(3, null, null, null));
+
+ executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+ Thread.sleep(2000); // wait refresh
+ executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 44, 444, 4444),
+ Row.of(3, 33, 333, 3333),
+ Row.of(4, null, null, null));
+
+ iterator.close();
+ }
+
@Test
public void testLookupProjection() throws Exception {
executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
@@ -448,6 +479,42 @@ public class LookupJoinITCase extends AbstractTestBase {
iterator.close();
}
+ @Test
+ public void testLookupPartialUpdateIllegal() throws Exception {
+ executeSql(
+ "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH"
+ + " ('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')");
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
+ assertThatThrownBy(() -> env.executeSql(query))
+ .hasRootCauseMessage(
+ "Partial update continuous reading is not supported. "
+ + "You can use full compaction changelog producer to support streaming reading.");
+ }
+
+ @Test
+ public void testLookupPartialUpdate() throws Exception {
+ executeSql(
+ "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH"
+ + " ('merge-engine'='partial-update',"
+ + " 'changelog-producer'='full-compaction',"
+ + " 'changelog-producer.compaction-interval'='1 s',"
+ + " 'continuous.discovery-interval'='10 ms')");
+ executeSql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS INT))");
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+ executeSql("INSERT INTO T VALUES (1)");
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, null, 111, null));
+
+ executeSql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)");
+ Thread.sleep(2000); // wait refresh
+ executeSql("INSERT INTO T VALUES (1)");
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, 11, 111, 1111));
+
+ iterator.close();
+ }
+
private void executeSql(String sql) throws ExecutionException, InterruptedException {
env.executeSql(sql).await();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index c6609070..f4512740 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -446,6 +446,10 @@ public class CoreOptions implements Serializable {
return options.get(MANIFEST_MERGE_MIN_COUNT);
}
+ public MergeEngine mergeEngine() {
+ return options.get(MERGE_ENGINE);
+ }
+
public long splitTargetSize() {
return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index 0a830780..dd3d286e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.store.file.predicate.PredicateFilter;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
-import org.apache.flink.table.store.table.source.snapshot.DeltaFollowUpScanner;
-import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.utils.TypeUtils;
@@ -88,13 +86,7 @@ public class TableStreamingReader {
if (predicate != null) {
scan.withFilter(predicate);
}
- enumerator =
- new ContinuousDataFileSnapshotEnumerator(
- table.location(),
- scan,
- new FullStartingScanner(),
- new DeltaFollowUpScanner(),
- null);
+ enumerator = ContinuousDataFileSnapshotEnumerator.createWithSnapshotStarting(table, scan);
}
@Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index 326f7e62..d1a462ef 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -19,8 +19,10 @@
package org.apache.flink.table.store.table.source.snapshot;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.DataTableScan;
@@ -31,6 +33,10 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.HashMap;
+
+import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+
/** {@link SnapshotEnumerator} for streaming read. */
public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator {
@@ -102,17 +108,35 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
// static create methods
// ------------------------------------------------------------------------
+ public static ContinuousDataFileSnapshotEnumerator createWithSnapshotStarting(
+ FileStoreTable table, DataTableScan scan) {
+ StartingScanner startingScanner =
+ table.options().startupMode() == CoreOptions.StartupMode.COMPACTED
+ ? new CompactedStartingScanner()
+ : new FullStartingScanner();
+ return new ContinuousDataFileSnapshotEnumerator(
+ table.location(), scan, startingScanner, createFollowUpScanner(table, scan), null);
+ }
+
public static ContinuousDataFileSnapshotEnumerator create(
- FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
+ FileStoreTable table, DataTableScan scan, @Nullable Long nextSnapshotId) {
+ return new ContinuousDataFileSnapshotEnumerator(
+ table.location(),
+ scan,
+ createStartingScanner(table),
+ createFollowUpScanner(table, scan),
+ nextSnapshotId);
+ }
+
+ private static StartingScanner createStartingScanner(FileStoreTable table) {
CoreOptions.StartupMode startupMode = table.options().startupMode();
Long startupMillis = table.options().logScanTimestampMills();
- StartingScanner startingScanner;
if (startupMode == CoreOptions.StartupMode.FULL) {
- startingScanner = new FullStartingScanner();
+ return new FullStartingScanner();
} else if (startupMode == CoreOptions.StartupMode.LATEST) {
- startingScanner = new ContinuousLatestStartingScanner();
+ return new ContinuousLatestStartingScanner();
} else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
- startingScanner = new CompactedStartingScanner();
+ return new CompactedStartingScanner();
} else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
Preconditions.checkNotNull(
startupMillis,
@@ -121,27 +145,45 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
CoreOptions.StartupMode.FROM_TIMESTAMP,
CoreOptions.SCAN_MODE.key()));
- startingScanner = new ContinuousFromTimestampStartingScanner(startupMillis);
+ return new ContinuousFromTimestampStartingScanner(startupMillis);
} else {
throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
}
+ }
+ private static FollowUpScanner createFollowUpScanner(FileStoreTable table, DataTableScan scan) {
CoreOptions.ChangelogProducer changelogProducer = table.options().changelogProducer();
- FollowUpScanner followUpScanner;
if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
- followUpScanner = new DeltaFollowUpScanner();
+ return new DeltaFollowUpScanner();
} else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
- followUpScanner = new InputChangelogFollowUpScanner();
+ return new InputChangelogFollowUpScanner();
} else if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
// this change in scan will affect both starting scanner and follow-up scanner
scan.withLevel(table.options().numLevels() - 1);
- followUpScanner = new CompactionChangelogFollowUpScanner();
+ return new CompactionChangelogFollowUpScanner();
} else {
throw new UnsupportedOperationException(
"Unknown changelog producer " + changelogProducer.name());
}
+ }
- return new ContinuousDataFileSnapshotEnumerator(
- table.location(), scan, startingScanner, followUpScanner, nextSnapshotId);
+ public static void validate(TableSchema schema) {
+ CoreOptions options = new CoreOptions(schema.options());
+ CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+ HashMap<CoreOptions.MergeEngine, String> mergeEngineDesc =
+ new HashMap<CoreOptions.MergeEngine, String>() {
+ {
+ put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial update");
+ put(CoreOptions.MergeEngine.AGGREGATE, "Pre-aggregate");
+ }
+ };
+ if (schema.primaryKeys().size() > 0
+ && mergeEngineDesc.containsKey(mergeEngine)
+ && options.changelogProducer() != FULL_COMPACTION) {
+ throw new ValidationException(
+ mergeEngineDesc.get(mergeEngine)
+ + " continuous reading is not supported. "
+ + "You can use full compaction changelog producer to support streaming reading.");
+ }
}
}