You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/09 09:46:37 UTC

[flink-table-store] branch master updated: [FLINK-30333] Supports lookup a partial-update table

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e33f36ff [FLINK-30333] Supports lookup a partial-update table
e33f36ff is described below

commit e33f36ff07d13e55d26da81ebbc916d1fe0c2e15
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Dec 9 17:46:31 2022 +0800

    [FLINK-30333] Supports lookup a partial-update table
    
    This closes #430.
---
 .../connector/lookup/FileStoreLookupFunction.java  |  3 +
 .../store/connector/source/FlinkSourceBuilder.java | 25 +-------
 .../table/store/connector/LookupJoinITCase.java    | 67 ++++++++++++++++++++++
 .../org/apache/flink/table/store/CoreOptions.java  |  4 ++
 .../store/table/source/TableStreamingReader.java   | 10 +---
 .../ContinuousDataFileSnapshotEnumerator.java      | 66 +++++++++++++++++----
 6 files changed, 131 insertions(+), 44 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
index 4f565fe0..ff67dcbf 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.store.file.predicate.PredicateFilter;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.TableStreamingReader;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.FileUtils;
@@ -85,6 +86,8 @@ public class FileStoreLookupFunction extends TableFunction<RowData> {
         checkArgument(
                 schema.partitionKeys().isEmpty(), "Currently only support non-partitioned table.");
         checkArgument(schema.primaryKeys().size() > 0, "Currently only support primary key table.");
+        ContinuousDataFileSnapshotEnumerator.validate(table.schema());
+
         this.table = table;
 
         // join keys are based on projection fields
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index b2495234..97217cee 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -25,29 +25,24 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.hybrid.HybridSource;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.CoreOptions.MergeEngine;
 import org.apache.flink.table.store.CoreOptions.StartupMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
-import java.util.HashMap;
 import java.util.Optional;
 
-import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
-import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
 
 /** Source builder to build a Flink {@link Source}. */
@@ -128,23 +123,7 @@ public class FlinkSourceBuilder {
 
     private Source<RowData, ?, ?> buildSource() {
         if (isContinuous) {
-            // TODO move validation to a dedicated method
-            MergeEngine mergeEngine = conf.get(MERGE_ENGINE);
-            HashMap<MergeEngine, String> mergeEngineDesc =
-                    new HashMap<MergeEngine, String>() {
-                        {
-                            put(MergeEngine.PARTIAL_UPDATE, "Partial update");
-                            put(MergeEngine.AGGREGATE, "Pre-aggregate");
-                        }
-                    };
-            if (table.schema().primaryKeys().size() > 0
-                    && mergeEngineDesc.containsKey(mergeEngine)
-                    && conf.get(CHANGELOG_PRODUCER) != FULL_COMPACTION) {
-                throw new ValidationException(
-                        mergeEngineDesc.get(mergeEngine)
-                                + " continuous reading is not supported. "
-                                + "You can use full compaction changelog producer to support streaming reading.");
-            }
+            ContinuousDataFileSnapshotEnumerator.validate(table.schema());
 
             // TODO visit all options through CoreOptions
             StartupMode startupMode = CoreOptions.startupMode(conf);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 9b38dbc3..a0f502b9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for lookup join. */
 public class LookupJoinITCase extends AbstractTestBase {
@@ -118,6 +119,36 @@ public class LookupJoinITCase extends AbstractTestBase {
         iterator.close();
     }
 
+    @Test
+    public void testLookupWithLatest() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('scan.mode'='latest') */"
+                        + " for system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, null, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 44, 444, 4444),
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(4, null, null, null));
+
+        iterator.close();
+    }
+
     @Test
     public void testLookupProjection() throws Exception {
         executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
@@ -448,6 +479,42 @@ public class LookupJoinITCase extends AbstractTestBase {
         iterator.close();
     }
 
+    @Test
+    public void testLookupPartialUpdateIllegal() throws Exception {
+        executeSql(
+                "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH"
+                        + " ('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')");
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
+        assertThatThrownBy(() -> env.executeSql(query))
+                .hasRootCauseMessage(
+                        "Partial update continuous reading is not supported. "
+                                + "You can use full compaction changelog producer to support streaming reading.");
+    }
+
+    @Test
+    public void testLookupPartialUpdate() throws Exception {
+        executeSql(
+                "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH"
+                        + " ('merge-engine'='partial-update',"
+                        + " 'changelog-producer'='full-compaction',"
+                        + " 'changelog-producer.compaction-interval'='1 s',"
+                        + " 'continuous.discovery-interval'='10 ms')");
+        executeSql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS INT))");
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+        executeSql("INSERT INTO T VALUES (1)");
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, null, 111, null));
+
+        executeSql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1)");
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, 11, 111, 1111));
+
+        iterator.close();
+    }
+
     private void executeSql(String sql) throws ExecutionException, InterruptedException {
         env.executeSql(sql).await();
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index c6609070..f4512740 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -446,6 +446,10 @@ public class CoreOptions implements Serializable {
         return options.get(MANIFEST_MERGE_MIN_COUNT);
     }
 
+    public MergeEngine mergeEngine() {
+        return options.get(MERGE_ENGINE);
+    }
+
     public long splitTargetSize() {
         return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index 0a830780..dd3d286e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.store.file.predicate.PredicateFilter;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
-import org.apache.flink.table.store.table.source.snapshot.DeltaFollowUpScanner;
-import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
 import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 import org.apache.flink.table.store.utils.TypeUtils;
 
@@ -88,13 +86,7 @@ public class TableStreamingReader {
         if (predicate != null) {
             scan.withFilter(predicate);
         }
-        enumerator =
-                new ContinuousDataFileSnapshotEnumerator(
-                        table.location(),
-                        scan,
-                        new FullStartingScanner(),
-                        new DeltaFollowUpScanner(),
-                        null);
+        enumerator = ContinuousDataFileSnapshotEnumerator.createWithSnapshotStarting(table, scan);
     }
 
     @Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index 326f7e62..d1a462ef 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -19,8 +19,10 @@
 package org.apache.flink.table.store.table.source.snapshot;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
@@ -31,6 +33,10 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
+
+import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+
 /** {@link SnapshotEnumerator} for streaming read. */
 public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator {
 
@@ -102,17 +108,35 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
     //  static create methods
     // ------------------------------------------------------------------------
 
+    public static ContinuousDataFileSnapshotEnumerator createWithSnapshotStarting(
+            FileStoreTable table, DataTableScan scan) {
+        StartingScanner startingScanner =
+                table.options().startupMode() == CoreOptions.StartupMode.COMPACTED
+                        ? new CompactedStartingScanner()
+                        : new FullStartingScanner();
+        return new ContinuousDataFileSnapshotEnumerator(
+                table.location(), scan, startingScanner, createFollowUpScanner(table, scan), null);
+    }
+
     public static ContinuousDataFileSnapshotEnumerator create(
-            FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
+            FileStoreTable table, DataTableScan scan, @Nullable Long nextSnapshotId) {
+        return new ContinuousDataFileSnapshotEnumerator(
+                table.location(),
+                scan,
+                createStartingScanner(table),
+                createFollowUpScanner(table, scan),
+                nextSnapshotId);
+    }
+
+    private static StartingScanner createStartingScanner(FileStoreTable table) {
         CoreOptions.StartupMode startupMode = table.options().startupMode();
         Long startupMillis = table.options().logScanTimestampMills();
-        StartingScanner startingScanner;
         if (startupMode == CoreOptions.StartupMode.FULL) {
-            startingScanner = new FullStartingScanner();
+            return new FullStartingScanner();
         } else if (startupMode == CoreOptions.StartupMode.LATEST) {
-            startingScanner = new ContinuousLatestStartingScanner();
+            return new ContinuousLatestStartingScanner();
         } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
-            startingScanner = new CompactedStartingScanner();
+            return new CompactedStartingScanner();
         } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
             Preconditions.checkNotNull(
                     startupMillis,
@@ -121,27 +145,45 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
                             CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
                             CoreOptions.StartupMode.FROM_TIMESTAMP,
                             CoreOptions.SCAN_MODE.key()));
-            startingScanner = new ContinuousFromTimestampStartingScanner(startupMillis);
+            return new ContinuousFromTimestampStartingScanner(startupMillis);
         } else {
             throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
         }
+    }
 
+    private static FollowUpScanner createFollowUpScanner(FileStoreTable table, DataTableScan scan) {
         CoreOptions.ChangelogProducer changelogProducer = table.options().changelogProducer();
-        FollowUpScanner followUpScanner;
         if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
-            followUpScanner = new DeltaFollowUpScanner();
+            return new DeltaFollowUpScanner();
         } else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
-            followUpScanner = new InputChangelogFollowUpScanner();
+            return new InputChangelogFollowUpScanner();
         } else if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
             // this change in scan will affect both starting scanner and follow-up scanner
             scan.withLevel(table.options().numLevels() - 1);
-            followUpScanner = new CompactionChangelogFollowUpScanner();
+            return new CompactionChangelogFollowUpScanner();
         } else {
             throw new UnsupportedOperationException(
                     "Unknown changelog producer " + changelogProducer.name());
         }
+    }
 
-        return new ContinuousDataFileSnapshotEnumerator(
-                table.location(), scan, startingScanner, followUpScanner, nextSnapshotId);
+    public static void validate(TableSchema schema) {
+        CoreOptions options = new CoreOptions(schema.options());
+        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+        HashMap<CoreOptions.MergeEngine, String> mergeEngineDesc =
+                new HashMap<CoreOptions.MergeEngine, String>() {
+                    {
+                        put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial update");
+                        put(CoreOptions.MergeEngine.AGGREGATE, "Pre-aggregate");
+                    }
+                };
+        if (schema.primaryKeys().size() > 0
+                && mergeEngineDesc.containsKey(mergeEngine)
+                && options.changelogProducer() != FULL_COMPACTION) {
+            throw new ValidationException(
+                    mergeEngineDesc.get(mergeEngine)
+                            + " continuous reading is not supported. "
+                            + "You can use full compaction changelog producer to support streaming reading.");
+        }
     }
 }