You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/10/14 04:57:04 UTC

[iceberg] branch master updated: Flink: Revise columns of TestFlinkUpsert to make tables are partitioned by date and more understandable (#5486) (#5486)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 120db78f58 Flink: Revise columns of TestFlinkUpsert to make tables are partitioned by date and more understandable (#5486) (#5486)
120db78f58 is described below

commit 120db78f58c0b313ce268eccba01bf6d3654e1c3
Author: Kunni <38...@users.noreply.github.com>
AuthorDate: Fri Oct 14 12:56:58 2022 +0800

    Flink: Revise columns of TestFlinkUpsert to make tables are partitioned by date and more understandable (#5486) (#5486)
---
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  |  96 +++++++++--------
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  | 117 +++++++++++----------
 2 files changed, 114 insertions(+), 99 deletions(-)

diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index e5cd8741c2..f62bcaa49d 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -127,33 +127,32 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
 
     sql(
-        "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) "
-            + "PARTITIONED BY (province) WITH %s",
+        "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) "
+            + "PARTITIONED BY (dt) WITH %s",
         tableName, toWithClause(tableUpsertProps));
 
     try {
       sql(
           "INSERT INTO %s VALUES "
-              + "(1, 'a', DATE '2022-03-01'),"
-              + "(2, 'b', DATE '2022-03-01'),"
-              + "(1, 'b', DATE '2022-03-01')",
+              + "(1, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-01'),"
+              + "(2, 'Jane', DATE '2022-03-01')",
           tableName);
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(4, 'a', DATE '2022-03-02'),"
-              + "(5, 'b', DATE '2022-03-02'),"
-              + "(1, 'b', DATE '2022-03-02')",
+              + "(2, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-02'),"
+              + "(2, 'Jane', DATE '2022-03-02')",
           tableName);
 
       List<Row> rowsOn20220301 =
-          Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+          Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301));
       TestHelpers.assertRows(
           sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
 
       List<Row> rowsOn20220302 =
-          Lists.newArrayList(
-              Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+          Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302));
       TestHelpers.assertRows(
           sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
 
@@ -215,30 +214,31 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
   @Test
   public void testPrimaryKeyEqualToPartitionKey() {
     // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
-    String tableName = "upsert_on_data_key";
+    String tableName = "upsert_on_id_key";
     try {
       sql(
-          "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) "
-              + "PARTITIONED BY (data) WITH %s",
+          "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY KEY(id) NOT ENFORCED) "
+              + "PARTITIONED BY (id) WITH %s",
           tableName, toWithClause(tableUpsertProps));
 
-      sql("INSERT INTO %s VALUES " + "(1, 'aaa')," + "(2, 'aaa')," + "(3, 'bbb')", tableName);
+      sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(1, 'Jane')," + "(2, 'Bill')", tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+          Lists.newArrayList(Row.of(1, "Jane"), Row.of(2, "Bill")));
 
-      sql("INSERT INTO %s VALUES " + "(4, 'aaa')," + "(5, 'bbb')", tableName);
+      sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(2, 'Jane')", tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+          Lists.newArrayList(Row.of(1, "Bill"), Row.of(2, "Jane")));
 
-      sql("INSERT INTO %s VALUES " + "(6, 'aaa')," + "(7, 'bbb')", tableName);
+      sql("INSERT INTO %s VALUES " + "(3, 'Bill')," + "(4, 'Jane')", tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+          Lists.newArrayList(
+              Row.of(1, "Bill"), Row.of(2, "Jane"), Row.of(3, "Bill"), Row.of(4, "Jane")));
     } finally {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }
@@ -250,40 +250,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     LocalDate dt = LocalDate.of(2022, 3, 1);
     try {
       sql(
-          "CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) "
-              + "PARTITIONED BY (data) WITH %s",
+          "CREATE TABLE %s( id INT, dt DATE NOT NULL, name STRING NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+              + "PARTITIONED BY (dt) WITH %s",
           tableName, toWithClause(tableUpsertProps));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "('aaa', DATE '2022-03-01', 1),"
-              + "('aaa', DATE '2022-03-01', 2),"
-              + "('bbb', DATE '2022-03-01', 3)",
+              + "(1, DATE '2022-03-01', 'Andy'),"
+              + "(1, DATE '2022-03-01', 'Bill'),"
+              + "(2, DATE '2022-03-01', 'Jane')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+          Lists.newArrayList(Row.of(1, dt, "Bill"), Row.of(2, dt, "Jane")));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "('aaa', DATE '2022-03-01', 4),"
-              + "('bbb', DATE '2022-03-01', 5)",
+              + "(1, DATE '2022-03-01', 'Jane'),"
+              + "(2, DATE '2022-03-01', 'Bill')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+          Lists.newArrayList(Row.of(1, dt, "Jane"), Row.of(2, dt, "Bill")));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "('aaa', DATE '2022-03-01', 6),"
-              + "('bbb', DATE '2022-03-01', 7)",
+              + "(3, DATE '2022-03-01', 'Duke'),"
+              + "(4, DATE '2022-03-01', 'Leon')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+          Lists.newArrayList(
+              Row.of(1, dt, "Jane"),
+              Row.of(2, dt, "Bill"),
+              Row.of(3, dt, "Duke"),
+              Row.of(4, dt, "Leon")));
     } finally {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }
@@ -298,40 +302,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     LocalDate dt = LocalDate.of(2022, 3, 1);
     try {
       sql(
-          "CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) "
-              + "PARTITIONED BY (data) WITH %s",
+          "CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+              + "PARTITIONED BY (dt) WITH %s",
           tableName, toWithClause(tableUpsertProps));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(1, 'aaa', DATE '2022-03-01'),"
-              + "(2, 'aaa', DATE '2022-03-01'),"
-              + "(3, 'bbb', DATE '2022-03-01')",
+              + "('Andy', 1, DATE '2022-03-01'),"
+              + "('Bill', 1, DATE '2022-03-01'),"
+              + "('Jane', 2, DATE '2022-03-01')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+          Lists.newArrayList(Row.of("Bill", 1, dt), Row.of("Jane", 2, dt)));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(4, 'aaa', DATE '2022-03-01'),"
-              + "(5, 'bbb', DATE '2022-03-01')",
+              + "('Jane', 1, DATE '2022-03-01'),"
+              + "('Bill', 2, DATE '2022-03-01')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+          Lists.newArrayList(Row.of("Jane", 1, dt), Row.of("Bill", 2, dt)));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(6, 'aaa', DATE '2022-03-01'),"
-              + "(7, 'bbb', DATE '2022-03-01')",
+              + "('Duke', 3, DATE '2022-03-01'),"
+              + "('Leon', 4, DATE '2022-03-01')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+          Lists.newArrayList(
+              Row.of("Jane", 1, dt),
+              Row.of("Bill", 2, dt),
+              Row.of("Duke", 3, dt),
+              Row.of("Leon", 4, dt)));
     } finally {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index e5cd8741c2..a25ebab6c4 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -127,33 +127,32 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
 
     sql(
-        "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) "
-            + "PARTITIONED BY (province) WITH %s",
+        "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) "
+            + "PARTITIONED BY (dt) WITH %s",
         tableName, toWithClause(tableUpsertProps));
 
     try {
       sql(
           "INSERT INTO %s VALUES "
-              + "(1, 'a', DATE '2022-03-01'),"
-              + "(2, 'b', DATE '2022-03-01'),"
-              + "(1, 'b', DATE '2022-03-01')",
+              + "(1, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-01'),"
+              + "(2, 'Jane', DATE '2022-03-01')",
           tableName);
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(4, 'a', DATE '2022-03-02'),"
-              + "(5, 'b', DATE '2022-03-02'),"
-              + "(1, 'b', DATE '2022-03-02')",
+              + "(2, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-02'),"
+              + "(2, 'Jane', DATE '2022-03-02')",
           tableName);
 
       List<Row> rowsOn20220301 =
-          Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+          Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301));
       TestHelpers.assertRows(
           sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
 
       List<Row> rowsOn20220302 =
-          Lists.newArrayList(
-              Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+          Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302));
       TestHelpers.assertRows(
           sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
 
@@ -174,33 +173,32 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     Map<String, String> optionsUpsertProps = Maps.newHashMap(tableUpsertProps);
     optionsUpsertProps.remove(TableProperties.UPSERT_ENABLED);
     sql(
-        "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) "
-            + "PARTITIONED BY (province) WITH %s",
+        "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) "
+            + "PARTITIONED BY (dt) WITH %s",
         tableName, toWithClause(optionsUpsertProps));
 
     try {
       sql(
           "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES "
-              + "(1, 'a', DATE '2022-03-01'),"
-              + "(2, 'b', DATE '2022-03-01'),"
-              + "(1, 'b', DATE '2022-03-01')",
+              + "(1, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-01'),"
+              + "(2, 'Jane', DATE '2022-03-01')",
           tableName);
 
       sql(
           "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES "
-              + "(4, 'a', DATE '2022-03-02'),"
-              + "(5, 'b', DATE '2022-03-02'),"
-              + "(1, 'b', DATE '2022-03-02')",
+              + "(2, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-02'),"
+              + "(2, 'Jane', DATE '2022-03-02')",
           tableName);
 
       List<Row> rowsOn20220301 =
-          Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+          Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301));
       TestHelpers.assertRows(
           sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
 
       List<Row> rowsOn20220302 =
-          Lists.newArrayList(
-              Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+          Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302));
       TestHelpers.assertRows(
           sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
 
@@ -215,30 +213,31 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
   @Test
   public void testPrimaryKeyEqualToPartitionKey() {
     // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
-    String tableName = "upsert_on_data_key";
+    String tableName = "upsert_on_id_key";
     try {
       sql(
-          "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) "
-              + "PARTITIONED BY (data) WITH %s",
+          "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY KEY(id) NOT ENFORCED) "
+              + "PARTITIONED BY (id) WITH %s",
           tableName, toWithClause(tableUpsertProps));
 
-      sql("INSERT INTO %s VALUES " + "(1, 'aaa')," + "(2, 'aaa')," + "(3, 'bbb')", tableName);
+      sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(1, 'Jane')," + "(2, 'Bill')", tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+          Lists.newArrayList(Row.of(1, "Jane"), Row.of(2, "Bill")));
 
-      sql("INSERT INTO %s VALUES " + "(4, 'aaa')," + "(5, 'bbb')", tableName);
+      sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(2, 'Jane')", tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+          Lists.newArrayList(Row.of(1, "Bill"), Row.of(2, "Jane")));
 
-      sql("INSERT INTO %s VALUES " + "(6, 'aaa')," + "(7, 'bbb')", tableName);
+      sql("INSERT INTO %s VALUES " + "(3, 'Bill')," + "(4, 'Jane')", tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+          Lists.newArrayList(
+              Row.of(1, "Bill"), Row.of(2, "Jane"), Row.of(3, "Bill"), Row.of(4, "Jane")));
     } finally {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }
@@ -250,40 +249,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     LocalDate dt = LocalDate.of(2022, 3, 1);
     try {
       sql(
-          "CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) "
-              + "PARTITIONED BY (data) WITH %s",
+          "CREATE TABLE %s(id INT, dt DATE NOT NULL, name STRING NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+              + "PARTITIONED BY (dt) WITH %s",
           tableName, toWithClause(tableUpsertProps));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "('aaa', DATE '2022-03-01', 1),"
-              + "('aaa', DATE '2022-03-01', 2),"
-              + "('bbb', DATE '2022-03-01', 3)",
+              + "(1, DATE '2022-03-01', 'Andy'),"
+              + "(1, DATE '2022-03-01', 'Bill'),"
+              + "(2, DATE '2022-03-01', 'Jane')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+          Lists.newArrayList(Row.of(1, dt, "Bill"), Row.of(2, dt, "Jane")));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "('aaa', DATE '2022-03-01', 4),"
-              + "('bbb', DATE '2022-03-01', 5)",
+              + "(1, DATE '2022-03-01', 'Jane'),"
+              + "(2, DATE '2022-03-01', 'Bill')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+          Lists.newArrayList(Row.of(1, dt, "Jane"), Row.of(2, dt, "Bill")));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "('aaa', DATE '2022-03-01', 6),"
-              + "('bbb', DATE '2022-03-01', 7)",
+              + "(3, DATE '2022-03-01', 'Duke'),"
+              + "(4, DATE '2022-03-01', 'Leon')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+          Lists.newArrayList(
+              Row.of(1, dt, "Jane"),
+              Row.of(2, dt, "Bill"),
+              Row.of(3, dt, "Duke"),
+              Row.of(4, dt, "Leon")));
     } finally {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }
@@ -298,40 +301,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     LocalDate dt = LocalDate.of(2022, 3, 1);
     try {
       sql(
-          "CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) "
-              + "PARTITIONED BY (data) WITH %s",
+          "CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+              + "PARTITIONED BY (dt) WITH %s",
           tableName, toWithClause(tableUpsertProps));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(1, 'aaa', DATE '2022-03-01'),"
-              + "(2, 'aaa', DATE '2022-03-01'),"
-              + "(3, 'bbb', DATE '2022-03-01')",
+              + "('Andy', 1, DATE '2022-03-01'),"
+              + "('Bill', 1, DATE '2022-03-01'),"
+              + "('Jane', 2, DATE '2022-03-01')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+          Lists.newArrayList(Row.of("Bill", 1, dt), Row.of("Jane", 2, dt)));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(4, 'aaa', DATE '2022-03-01'),"
-              + "(5, 'bbb', DATE '2022-03-01')",
+              + "('Jane', 1, DATE '2022-03-01'),"
+              + "('Bill', 2, DATE '2022-03-01')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+          Lists.newArrayList(Row.of("Jane", 1, dt), Row.of("Bill", 2, dt)));
 
       sql(
           "INSERT INTO %s VALUES "
-              + "(6, 'aaa', DATE '2022-03-01'),"
-              + "(7, 'bbb', DATE '2022-03-01')",
+              + "('Duke', 3, DATE '2022-03-01'),"
+              + "('Leon', 4, DATE '2022-03-01')",
           tableName);
 
       TestHelpers.assertRows(
           sql("SELECT * FROM %s", tableName),
-          Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+          Lists.newArrayList(
+              Row.of("Jane", 1, dt),
+              Row.of("Bill", 2, dt),
+              Row.of("Duke", 3, dt),
+              Row.of("Leon", 4, dt)));
     } finally {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }