You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/05 04:34:40 UTC

[flink-table-store] branch master updated: [FLINK-30560] Add more description of 'Overwriting a Partition' to doc 'Writing Tables'

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new ad5c0e24 [FLINK-30560] Add more description of 'Overwriting a Partition' to doc 'Writing Tables'
ad5c0e24 is described below

commit ad5c0e24255c55b3fd478fb6e509b2191c068a90
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Thu Jan 5 12:34:36 2023 +0800

    [FLINK-30560] Add more description of 'Overwriting a Partition' to doc 'Writing Tables'
    
    This closes #457
---
 docs/content/docs/how-to/writing-tables.md         | 110 ++++++++++++++++++++
 .../store/connector/ReadWriteTableITCase.java      | 113 +++++++++++++++++++++
 2 files changed, 223 insertions(+)

diff --git a/docs/content/docs/how-to/writing-tables.md b/docs/content/docs/how-to/writing-tables.md
index fd3e3ba5..db0e4661 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -26,6 +26,59 @@ under the License.
 
 # Writing Tables
 
+You can use the `INSERT` statement to inserts new rows into a table 
+or overwrites the existing data in the table. The inserted rows can 
+be specified by value expressions or result from a query.
+
+## Syntax
+
+```sql
+INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }
+```
+- part_spec
+
+    An optional parameter that specifies a comma-separated list of key and value pairs for partitions. 
+    Note that one can use a typed literal (e.g., date’2019-01-02’) in the partition spec.
+
+    Syntax: PARTITION ( partition_col_name = partition_col_val [ , ... ] )
+
+- column_list
+
+    An optional parameter that specifies a comma-separated list of columns belonging to the 
+    table_identifier table.
+    
+    Syntax: (col_name1 [, column_name2, ...])
+    
+    {{< hint info >}}
+
+    All specified columns should exist in the table and not be duplicated from each other.
+    It includes all columns except the static partition columns.
+      
+    The size of the column list should be exactly the size of the data from VALUES clause or query.
+    
+    {{< /hint >}}
+
+- value_expr
+
+    Specifies the values to be inserted. Either an explicitly specified value or a NULL can be 
+    inserted. A comma must be used to separate each value in the clause. More than one set of 
+    values can be specified to insert multiple rows.
+
+    Syntax: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]
+
+    {{< hint info >}}
+
+    Currently, Flink doesn't support use NULL directly, so the NULL should be cast to actual 
+    data type by `CAST (NULL AS data_type)`.
+
+    {{< /hint >}}
+
+For more information, please check the syntax document:
+
+[Flink INSERT Statement](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/)
+
+[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)
+
 ## Applying Records/Changes to Tables
 
 {{< tabs "insert-into-example" >}}
@@ -87,3 +140,60 @@ INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ..
 {{< /tab >}}
 
 {{< /tabs >}}
+
+## Purging tables
+
+You can use `INSERT OVERWRITE` to purge tables by inserting empty value.
+
+{{< tabs "purge-tables-syntax" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+INSERT OVERWRITE MyTable SELECT * FROM MyTable WHERE false
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Purging a Partition
+
+Particularly, you can use `INSERT OVERWRITE` to purge data of a partition by inserting empty value to the partition:
+
+{{< tabs "purge-partition-syntax" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+The following SQL is an example:
+
+{{< tabs "purge-partition-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+-- table definition
+CREATE TABLE MyTable (
+    k0 INT,
+    k1 INT,
+    v STRING
+) PARTITIONED BY (k0, k1);
+
+-- you can use
+INSERT OVERWRITE MyTable PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false
+
+-- or
+INSERT OVERWRITE MyTable PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 0cc8fc07..cbdaa43e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -41,6 +42,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Test;
 
@@ -381,6 +383,76 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
                         changelogRow("+I", "US Dollar")));
     }
 
+    @Test
+    public void testPurgeTableUsingBatchOverWrite() throws Exception {
+        TableEnvironment tEnv =
+                TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG test_catalog WITH ("
+                                + "'type'='table-store', 'warehouse'='%s')",
+                        TEMPORARY_FOLDER.newFolder().toURI()));
+        tEnv.useCatalog("test_catalog");
+        tEnv.executeSql("CREATE TABLE test (k1 INT, k2 STRING, v STRING)");
+
+        validateOverwriteResult(tEnv, "test", "", "*", Collections.emptyList());
+    }
+
+    @Test
+    public void testPurgePartitionUsingBatchOverWrite() throws Exception {
+        TableEnvironment tEnv =
+                TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG test_catalog WITH ("
+                                + "'type'='table-store', 'warehouse'='%s')",
+                        TEMPORARY_FOLDER.newFolder().toURI()));
+        tEnv.useCatalog("test_catalog");
+
+        // single partition key
+        tEnv.executeSql(
+                "CREATE TABLE test_single (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1)");
+
+        validateOverwriteResult(
+                tEnv,
+                "test_single",
+                "PARTITION (k1 = 0)",
+                "k2, v",
+                Arrays.asList(
+                        changelogRow("+I", 1, "2023-01-01", "flink"),
+                        changelogRow("+I", 1, "2023-01-02", "table"),
+                        changelogRow("+I", 1, "2023-01-02", "store")));
+
+        // multiple partition keys and overwrite one partition key
+        tEnv.executeSql(
+                "CREATE TABLE test_multiple0 (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1, k2)");
+
+        validateOverwriteResult(
+                tEnv,
+                "test_multiple0",
+                "PARTITION (k1 = 0)",
+                "k2, v",
+                Arrays.asList(
+                        changelogRow("+I", 1, "2023-01-01", "flink"),
+                        changelogRow("+I", 1, "2023-01-02", "table"),
+                        changelogRow("+I", 1, "2023-01-02", "store")));
+
+        // multiple partition keys and overwrite all partition keys
+        tEnv.executeSql(
+                "CREATE TABLE test_multiple1 (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1, k2)");
+
+        validateOverwriteResult(
+                tEnv,
+                "test_multiple1",
+                "PARTITION (k1 = 0, k2 = '2023-01-01')",
+                "v",
+                Arrays.asList(
+                        changelogRow("+I", 0, "2023-01-02", "world"),
+                        changelogRow("+I", 1, "2023-01-01", "flink"),
+                        changelogRow("+I", 1, "2023-01-02", "table"),
+                        changelogRow("+I", 1, "2023-01-02", "store")));
+    }
+
     @Test
     public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
         // input is dailyRatesChangelogWithoutUB()
@@ -1507,6 +1579,47 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
 
     // ------------------------ Tools ----------------------------------
 
+    private void validateOverwriteResult(
+            TableEnvironment tEnv,
+            String table,
+            String partitionSpec,
+            String selectSpec,
+            List<Row> expected)
+            throws Exception {
+        tEnv.executeSql(
+                        String.format("INSERT INTO %s VALUES ", table)
+                                + "(0, '2023-01-01', 'hi'), "
+                                + "(0, '2023-01-01', 'hello'), "
+                                + "(0, '2023-01-02', 'world'), "
+                                + "(1, '2023-01-01', 'flink'), "
+                                + "(1, '2023-01-02', 'table'), "
+                                + "(1, '2023-01-02', 'store')")
+                .await();
+
+        assertThat(
+                        CollectionUtil.iteratorToList(
+                                tEnv.executeSql("SELECT * FROM " + table).collect()))
+                .containsExactlyInAnyOrderElementsOf(
+                        Arrays.asList(
+                                changelogRow("+I", 0, "2023-01-01", "hi"),
+                                changelogRow("+I", 0, "2023-01-01", "hello"),
+                                changelogRow("+I", 0, "2023-01-02", "world"),
+                                changelogRow("+I", 1, "2023-01-01", "flink"),
+                                changelogRow("+I", 1, "2023-01-02", "table"),
+                                changelogRow("+I", 1, "2023-01-02", "store")));
+
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT OVERWRITE %s %s SELECT %s FROM %s WHERE false",
+                                table, partitionSpec, selectSpec, table))
+                .await();
+
+        assertThat(
+                        CollectionUtil.iteratorToList(
+                                tEnv.executeSql("SELECT * FROM " + table).collect()))
+                .containsExactlyInAnyOrderElementsOf(expected);
+    }
+
     private String collectAndCheckBatchReadWrite(
             boolean partitioned,
             boolean hasPk,