You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/16 10:00:53 UTC

[flink] 04/04: [FLINK-30661][table] introduce SupportsRowLevelUpdate interface

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 904c695776a6c28e67c50ce21115141a99446aa8
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Jan 13 12:05:20 2023 +0800

    [FLINK-30661][table] introduce SupportsRowLevelUpdate interface
---
 .../connector/RowLevelModificationScanContext.java |   1 +
 .../sink/abilities/SupportsRowLevelUpdate.java     | 113 +++++++++++++++++++++
 .../SupportsRowLevelModificationScan.java          |   1 +
 3 files changed, 115 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
index 7ca79b43ff6..1468bf5cb7d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
 
 /**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java
new file mode 100644
index 00000000000..65945332f56
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support update existing data according to row-level
+ * changes. The table sink is responsible for telling planner how to produce the row changes, and
+ * consuming them to achieve the purpose of row(s) update.
+ *
+ * <p>The planner will call method {@link #applyRowLevelUpdate(List,
+ * RowLevelModificationScanContext)} to get the {@link RowLevelUpdateInfo} returned by sink, and
+ * rewrite the update statement based on the retrieved {@link RowLevelUpdateInfo} to produce rows to
+ * {@link DynamicTableSink}.
+ */
+@PublicEvolving
+public interface SupportsRowLevelUpdate {
+
+    /**
+     * Applies row-level update with providing the updated columns and {@link
+     * RowLevelModificationScanContext}, and return {@link RowLevelUpdateInfo}.
+     *
+     * @param updatedColumns the columns updated by update statement in table column order.
+     * @param context the context passed by table source which implement {@link
+     *     SupportsRowLevelModificationScan}. It'll be null if the table source doesn't implement
+     *     it.
+     */
+    RowLevelUpdateInfo applyRowLevelUpdate(
+            List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context);
+
+    /** The information that guides the planner on how to rewrite the update statement. */
+    @PublicEvolving
+    interface RowLevelUpdateInfo {
+
+        /**
+         * The required columns by the sink to perform row-level update. The rows consumed by sink
+         * will contain the required columns in order. If return Optional.empty(), it will contain
+         * all columns.
+         */
+        default Optional<List<Column>> requiredColumns() {
+            return Optional.empty();
+        }
+
+        /**
+         * Planner will rewrite the update statement to query base on the {@link
+         * RowLevelUpdateMode}, keeping the query of update unchanged by default(in `UPDATED_ROWS`
+         * mode), or changing the query to union the updated rows and the other rows (in `ALL_ROWS`
+         * mode).
+         *
+         * <p>Take the following SQL as an example:
+         *
+         * <pre>{@code
+         * UPDATE t SET x = 1 WHERE y = 2;
+         * }</pre>
+         *
+         * <p>If returns {@link RowLevelUpdateMode#UPDATED_ROWS}, the sink will get the update after
+         * rows which match the filter [y = 2].
+         *
+         * <p>If returns {@link RowLevelUpdateMode#ALL_ROWS}, the sink will get both the update
+         * after rows which match the filter [y = 2] and the other rows that don't match the filter
+         * [y = 2].
+         *
+         * <p>Note: All rows will have RowKind#UPDATE_AFTER when RowLevelUpdateMode is UPDATED_ROWS,
+         * and RowKind#INSERT when RowLevelUpdateMode is ALL_ROWS.
+         */
+        default RowLevelUpdateMode getRowLevelUpdateMode() {
+            return RowLevelUpdateMode.UPDATED_ROWS;
+        }
+    }
+
+    /**
+     * Type of update modes that the sink expects for update purpose.
+     *
+     * <p>Currently, two modes are supported:
+     *
+     * <ul>
+     *   <li>UPDATED_ROWS - in this mode, the sink will only get the update after rows.
+     *   <li>ALL_ROWS - in this mode, the sink will get all the rows including both the update after
+     *       rows and the other rows that don't need to be updated.
+     * </ul>
+     */
+    @PublicEvolving
+    enum RowLevelUpdateMode {
+        UPDATED_ROWS,
+        ALL_ROWS
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
index 803816d3c98..fda171903d8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.connector.source.abilities;
 
 import org.apache.flink.table.connector.RowLevelModificationScanContext;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.connector.source.ScanTableSource;
 
 import javax.annotation.Nullable;