You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/16 10:00:49 UTC

[flink] branch master updated (1c524794362 -> 904c695776a)

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 1c524794362 [FLINK-30617][hive] Fix wrong data type for cast null literal with Hive dialect
     new 6cf7e5d2ba4 [FLINK-30661][table] introduce SupportsRowLevelModificationScan interface
     new 69837fc6145 [FLINK-30661][table] introduce SupportsDeletePushDown interface
     new e25ea3fe1d9 [FLINK-30661][table] introduce SupportsRowLevelDelete interface
     new 904c695776a [FLINK-30661][table] introduce SupportsRowLevelUpdate interface

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/RowLevelModificationScanContext.java |  39 +++++++
 .../sink/abilities/SupportsDeletePushDown.java     |  82 +++++++++++++++
 .../sink/abilities/SupportsRowLevelDelete.java     | 114 +++++++++++++++++++++
 .../sink/abilities/SupportsRowLevelUpdate.java     | 113 ++++++++++++++++++++
 .../SupportsRowLevelModificationScan.java          |  80 +++++++++++++++
 5 files changed, 428 insertions(+)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java


[flink] 04/04: [FLINK-30661][table] introduce SupportsRowLevelUpdate interface

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 904c695776a6c28e67c50ce21115141a99446aa8
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Jan 13 12:05:20 2023 +0800

    [FLINK-30661][table] introduce SupportsRowLevelUpdate interface
---
 .../connector/RowLevelModificationScanContext.java |   1 +
 .../sink/abilities/SupportsRowLevelUpdate.java     | 113 +++++++++++++++++++++
 .../SupportsRowLevelModificationScan.java          |   1 +
 3 files changed, 115 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
index 7ca79b43ff6..1468bf5cb7d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
 
 /**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java
new file mode 100644
index 00000000000..65945332f56
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support update existing data according to row-level
+ * changes. The table sink is responsible for telling planner how to produce the row changes, and
+ * consuming them to achieve the purpose of row(s) update.
+ *
+ * <p>The planner will call method {@link #applyRowLevelUpdate(List,
+ * RowLevelModificationScanContext)} to get the {@link RowLevelUpdateInfo} returned by sink, and
+ * rewrite the update statement based on the retrieved {@link RowLevelUpdateInfo} to produce rows to
+ * {@link DynamicTableSink}.
+ */
+@PublicEvolving
+public interface SupportsRowLevelUpdate {
+
+    /**
+     * Applies row-level update with providing the updated columns and {@link
+     * RowLevelModificationScanContext}, and return {@link RowLevelUpdateInfo}.
+     *
+     * @param updatedColumns the columns updated by update statement in table column order.
+     * @param context the context passed by table source which implement {@link
+     *     SupportsRowLevelModificationScan}. It'll be null if the table source doesn't implement
+     *     it.
+     */
+    RowLevelUpdateInfo applyRowLevelUpdate(
+            List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context);
+
+    /** The information that guides the planner on how to rewrite the update statement. */
+    @PublicEvolving
+    interface RowLevelUpdateInfo {
+
+        /**
+         * The required columns by the sink to perform row-level update. The rows consumed by sink
+         * will contain the required columns in order. If return Optional.empty(), it will contain
+         * all columns.
+         */
+        default Optional<List<Column>> requiredColumns() {
+            return Optional.empty();
+        }
+
+        /**
+         * Planner will rewrite the update statement to query base on the {@link
+         * RowLevelUpdateMode}, keeping the query of update unchanged by default(in `UPDATED_ROWS`
+         * mode), or changing the query to union the updated rows and the other rows (in `ALL_ROWS`
+         * mode).
+         *
+         * <p>Take the following SQL as an example:
+         *
+         * <pre>{@code
+         * UPDATE t SET x = 1 WHERE y = 2;
+         * }</pre>
+         *
+         * <p>If returns {@link RowLevelUpdateMode#UPDATED_ROWS}, the sink will get the update after
+         * rows which match the filter [y = 2].
+         *
+         * <p>If returns {@link RowLevelUpdateMode#ALL_ROWS}, the sink will get both the update
+         * after rows which match the filter [y = 2] and the other rows that don't match the filter
+         * [y = 2].
+         *
+         * <p>Note: All rows will have RowKind#UPDATE_AFTER when RowLevelUpdateMode is UPDATED_ROWS,
+         * and RowKind#INSERT when RowLevelUpdateMode is ALL_ROWS.
+         */
+        default RowLevelUpdateMode getRowLevelUpdateMode() {
+            return RowLevelUpdateMode.UPDATED_ROWS;
+        }
+    }
+
+    /**
+     * Type of update modes that the sink expects for update purpose.
+     *
+     * <p>Currently, two modes are supported:
+     *
+     * <ul>
+     *   <li>UPDATED_ROWS - in this mode, the sink will only get the update after rows.
+     *   <li>ALL_ROWS - in this mode, the sink will get all the rows including both the update after
+     *       rows and the other rows that don't need to be updated.
+     * </ul>
+     */
+    @PublicEvolving
+    enum RowLevelUpdateMode {
+        UPDATED_ROWS,
+        ALL_ROWS
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
index 803816d3c98..fda171903d8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.connector.source.abilities;
 
 import org.apache.flink.table.connector.RowLevelModificationScanContext;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.connector.source.ScanTableSource;
 
 import javax.annotation.Nullable;


[flink] 01/04: [FLINK-30661][table] introduce SupportsRowLevelModificationScan interface

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cf7e5d2ba48bd565ab3ec2841059606a49668b1
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Jan 13 11:24:38 2023 +0800

    [FLINK-30661][table] introduce SupportsRowLevelModificationScan interface
---
 .../connector/RowLevelModificationScanContext.java | 37 ++++++++++
 .../SupportsRowLevelModificationScan.java          | 78 ++++++++++++++++++++++
 2 files changed, 115 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
new file mode 100644
index 00000000000..f0e3c35ba36
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+/**
+ * The context is intended to provide the relevant table scan information needed by the sink to
+ * perform row-level update/delete. It'll be generated by a table source that implements {@link
+ * SupportsRowLevelModificationScan}, and then passed to a sink which implements {@link
+ * SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete} for executing UPDATE/DELETE statement
+ * during compilation phase.
+ *
+ * <p>This mechanism enables the coordination between the table sources and the table sink which is
+ * to be updated/deleted.
+ *
+ * <p>Connectors can implement this interface to provide custom information.
+ */
+@PublicEvolving
+public interface RowLevelModificationScanContext {}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
new file mode 100644
index 00000000000..bbf0bfa969b
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import javax.annotation.Nullable;
+
+/**
+ * Interface for {@link ScanTableSource}s that support the row-level modification. The table source
+ * is responsible for returning the information described by {@link
+ * RowLevelModificationScanContext}. The context will be propagated to the sink which implements
+ * {@link SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete}.
+ *
+ * <p>Note: This interface is optional for table sources to implement. For cases where the table
+ * source neither needs to know the type of row-level modification nor propagate information to
+ * sink, the table source does't need to implement this interface. See more details at {@link
+ * #applyRowLevelModificationScan(RowLevelModificationType, RowLevelModificationScanContext)}.
+ */
+public interface SupportsRowLevelModificationScan {
+
+    /**
+     * Applies the type of row-level modification and the previous {@link
+     * RowLevelModificationScanContext} returned by previous table source scan, return a new {@link
+     * RowLevelModificationScanContext}. If the table source is the last one, the {@link
+     * RowLevelModificationScanContext} will be passed to the table sink. Otherwise, it will be
+     * passed to the following table source.
+     *
+     * <p>Note: For the all tables in the UPDATE/DELETE statement, this method will be involved for
+     * the corresponding table source scan.
+     *
+     * <p>Note: It may have multiple table sources in the case of sub-query. In such case, it will
+     * return multiple {@link RowLevelModificationScanContext}s. To handle such case, the planner
+     * will also pass the previous {@link RowLevelModificationScanContext} to the current table
+     * source scan which is expected to decide what to do with the previous {@link
+     * RowLevelModificationScanContext}. The order is consistent with the compilation order of the
+     * table sources. The planer will only pass the last context returned to the sink.
+     *
+     * @param previousContext the context returned by previous table source, if there's no previous
+     *     context, it'll be null.
+     */
+    RowLevelModificationScanContext applyRowLevelModificationScan(
+            RowLevelModificationType rowLevelModificationType,
+            @Nullable RowLevelModificationScanContext previousContext);
+
+    /**
+     * Type of the row-level modification for table.
+     *
+     * <p>Currently, two types are supported:
+     *
+     * <ul>
+     *   <li>UPDATE
+     *   <li>DELETE
+     * </ul>
+     */
+    enum RowLevelModificationType {
+        UPDATE,
+
+        DELETE
+    }
+}


[flink] 03/04: [FLINK-30661][table] introduce SupportsRowLevelDelete interface

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e25ea3fe1d9ed3ab9f62ac01eb4aaec57b2c6ed2
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Jan 13 12:03:45 2023 +0800

    [FLINK-30661][table] introduce SupportsRowLevelDelete interface
---
 .../connector/RowLevelModificationScanContext.java |   1 +
 .../sink/abilities/SupportsRowLevelDelete.java     | 114 +++++++++++++++++++++
 .../SupportsRowLevelModificationScan.java          |   1 +
 3 files changed, 116 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
index f0e3c35ba36..7ca79b43ff6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
 
 /**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java
new file mode 100644
index 00000000000..5aaf89439c2
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support delete existing data according to row-level
+ * changes. The table sink is responsible for telling planner how to produce the row changes, and
+ * consuming them to achieve the purpose of row(s) deletion.
+ *
+ * <p>The planner will call the method {@link #applyRowLevelDelete(RowLevelModificationScanContext)}
+ * to get the {@link RowLevelDeleteInfo} returned by sink, and rewrite the delete statement based on
+ * the retrieved {@link RowLevelDeleteInfo} to produce rows to {@link DynamicTableSink}.
+ *
+ * <p>Note: For the cases where the table sink implement both {@link SupportsDeletePushDown} and
+ * {@link SupportsRowLevelDelete}, the planner always prefers {@link SupportsDeletePushDown} over
+ * {@link SupportsRowLevelDelete} on condition that {@link
+ * SupportsDeletePushDown#applyDeleteFilters(List)} return true.
+ */
+@PublicEvolving
+public interface SupportsRowLevelDelete {
+
+    /**
+     * Applies row-level delete with {@link RowLevelModificationScanContext}, and return a {@link
+     * RowLevelDeleteInfo}.
+     *
+     * @param context the context passed by table source which implement {@link
+     *     SupportsRowLevelModificationScan}. It'll be null if the table source doesn't implement
+     *     it.
+     */
+    RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context);
+
+    /** The information that guides the planner on how to rewrite the delete statement. */
+    @PublicEvolving
+    interface RowLevelDeleteInfo {
+
+        /**
+         * The required columns by the sink to perform row-level delete. The rows consumed by sink
+         * will contain the required columns in order. If return Optional.empty(), it will contain
+         * all columns.
+         */
+        default Optional<List<Column>> requiredColumns() {
+            return Optional.empty();
+        }
+
+        /**
+         * Planner will rewrite delete statement to query base on the {@link RowLevelDeleteInfo},
+         * keeping the query of delete unchanged by default(in `DELETE_ROWS` mode), or changing the
+         * query to the complementary set in REMAINING_ROWS mode.
+         *
+         * <p>Take the following SQL as an example:
+         *
+         * <pre>{@code
+         * DELETE FROM t WHERE y = 2;
+         * }</pre>
+         *
+         * <p>If returns {@link SupportsRowLevelDelete.RowLevelDeleteMode#DELETED_ROWS}, the sink
+         * will get the rows to be deleted which match the filter [y = 2].
+         *
+         * <p>If returns {@link SupportsRowLevelDelete.RowLevelDeleteMode#REMAINING_ROWS}, the sink
+         * will get the rows which don't match the filter [y = 2].
+         *
+         * <p>Note: All rows will be of RowKind#DELETE when RowLevelDeleteMode is DELETED_ROWS, and
+         * RowKind#INSERT when RowLevelDeleteMode is REMAINING_ROWS.
+         */
+        default RowLevelDeleteMode getRowLevelDeleteMode() {
+            return RowLevelDeleteMode.DELETED_ROWS;
+        }
+    }
+
+    /**
+     * Type of delete modes that the sink expects for delete purpose.
+     *
+     * <p>Currently, two modes are supported:
+     *
+     * <ul>
+     *   <li>DELETED_ROWS - in this mode, the sink will only get the rows that need to be deleted.
+     *   <li>REMAINING_ROWS - in this mode, the sink will only get the remaining rows after
+     *       deletion.
+     * </ul>
+     */
+    @PublicEvolving
+    enum RowLevelDeleteMode {
+        DELETED_ROWS,
+
+        REMAINING_ROWS
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
index bbf0bfa969b..803816d3c98 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.connector.source.abilities;
 
 import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.source.ScanTableSource;
 
 import javax.annotation.Nullable;


[flink] 02/04: [FLINK-30661][table] introduce SupportsDeletePushDown interface

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69837fc6145d7491c0dc7aeace43317f56c0e6aa
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Jan 13 11:29:31 2023 +0800

    [FLINK-30661][table] introduce SupportsDeletePushDown interface
---
 .../sink/abilities/SupportsDeletePushDown.java     | 82 ++++++++++++++++++++++
 1 file changed, 82 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java
new file mode 100644
index 00000000000..8220812dbad
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Enables to push down filters decomposed from the {@code WHERE} clause in delete statement to
+ * {@link DynamicTableSink}. The table sink can delete existing data directly according to the
+ * filters.
+ *
+ * <p>Flink will get the filters in conjunctive form and push down the filters into sink by calling
+ * method {@link #applyDeleteFilters(List)} in the planning phase. If it returns true, Flink will
+ * then call {@link #executeDeletion()} to execute the actual deletion during execution phase.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL;*
+ * }</pre>
+ *
+ * <p>In the example above, the {@code WHERE} clause will be decomposed into two filters
+ *
+ * <ul>
+ *   <li>{@code [a = '1' OR a = '2']}
+ *   <li>{@code [b IS NOT NULL]}
+ * </ul>
+ *
+ * <p>If the sink can accept both filters which means the sink can delete data directly according to
+ * the filters, {@link #applyDeleteFilters(List)} should return true. Otherwise, it should return
+ * false.
+ *
+ * <p>Note: For the cases where the filter expression is not available, e.g., sub-query or {@link
+ * #applyDeleteFilters(List)} returns false, if the sink implements {@link SupportsRowLevelDelete},
+ * Flink will try to rewrite the delete statement and produce row-level changes, see {@link
+ * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link
+ * UnsupportedOperationException}.
+ */
+@PublicEvolving
+public interface SupportsDeletePushDown {
+
+    /**
+     * Provides a list of filters specified by {@code WHERE} clause in conjunctive form and return
+     * the acceptance status to planner during planning phase.
+     *
+     * @param filters a list of resolved filter expressions.
+     * @return true if the sink accepts all filters; false otherwise.
+     */
+    boolean applyDeleteFilters(List<ResolvedExpression> filters);
+
+    /**
+     * Deletes data during execution phase.
+     *
+     * <p>Note: The method will be involved iff the method {@link #applyDeleteFilters(List)} returns
+     * true.
+     *
+     * @return the number of the estimated rows to be deleted, or {@link Optional#empty()} for the
+     *     unknown condition.
+     */
+    Optional<Long> executeDeletion();
+}