You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/10/14 04:57:04 UTC
[iceberg] branch master updated: Flink: Revise columns of TestFlinkUpsert to make tables are partitioned by date and more understandable (#5486) (#5486)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 120db78f58 Flink: Revise columns of TestFlinkUpsert to make tables are partitioned by date and more understandable (#5486) (#5486)
120db78f58 is described below
commit 120db78f58c0b313ce268eccba01bf6d3654e1c3
Author: Kunni <38...@users.noreply.github.com>
AuthorDate: Fri Oct 14 12:56:58 2022 +0800
Flink: Revise columns of TestFlinkUpsert to make tables are partitioned by date and more understandable (#5486) (#5486)
---
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 96 +++++++++--------
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 117 +++++++++++----------
2 files changed, 114 insertions(+), 99 deletions(-)
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index e5cd8741c2..f62bcaa49d 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -127,33 +127,32 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
sql(
- "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) "
- + "PARTITIONED BY (province) WITH %s",
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
tableName, toWithClause(tableUpsertProps));
try {
sql(
"INSERT INTO %s VALUES "
- + "(1, 'a', DATE '2022-03-01'),"
- + "(2, 'b', DATE '2022-03-01'),"
- + "(1, 'b', DATE '2022-03-01')",
+ + "(1, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-01'),"
+ + "(2, 'Jane', DATE '2022-03-01')",
tableName);
sql(
"INSERT INTO %s VALUES "
- + "(4, 'a', DATE '2022-03-02'),"
- + "(5, 'b', DATE '2022-03-02'),"
- + "(1, 'b', DATE '2022-03-02')",
+ + "(2, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-02'),"
+ + "(2, 'Jane', DATE '2022-03-02')",
tableName);
List<Row> rowsOn20220301 =
- Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+ Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
List<Row> rowsOn20220302 =
- Lists.newArrayList(
- Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+ Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
@@ -215,30 +214,31 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
@Test
public void testPrimaryKeyEqualToPartitionKey() {
// This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
- String tableName = "upsert_on_data_key";
+ String tableName = "upsert_on_id_key";
try {
sql(
- "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) "
- + "PARTITIONED BY (data) WITH %s",
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY KEY(id) NOT ENFORCED) "
+ + "PARTITIONED BY (id) WITH %s",
tableName, toWithClause(tableUpsertProps));
- sql("INSERT INTO %s VALUES " + "(1, 'aaa')," + "(2, 'aaa')," + "(3, 'bbb')", tableName);
+ sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(1, 'Jane')," + "(2, 'Bill')", tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+ Lists.newArrayList(Row.of(1, "Jane"), Row.of(2, "Bill")));
- sql("INSERT INTO %s VALUES " + "(4, 'aaa')," + "(5, 'bbb')", tableName);
+ sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(2, 'Jane')", tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+ Lists.newArrayList(Row.of(1, "Bill"), Row.of(2, "Jane")));
- sql("INSERT INTO %s VALUES " + "(6, 'aaa')," + "(7, 'bbb')", tableName);
+ sql("INSERT INTO %s VALUES " + "(3, 'Bill')," + "(4, 'Jane')", tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+ Lists.newArrayList(
+ Row.of(1, "Bill"), Row.of(2, "Jane"), Row.of(3, "Bill"), Row.of(4, "Jane")));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
@@ -250,40 +250,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql(
- "CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) "
- + "PARTITIONED BY (data) WITH %s",
+ "CREATE TABLE %s( id INT, dt DATE NOT NULL, name STRING NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
tableName, toWithClause(tableUpsertProps));
sql(
"INSERT INTO %s VALUES "
- + "('aaa', DATE '2022-03-01', 1),"
- + "('aaa', DATE '2022-03-01', 2),"
- + "('bbb', DATE '2022-03-01', 3)",
+ + "(1, DATE '2022-03-01', 'Andy'),"
+ + "(1, DATE '2022-03-01', 'Bill'),"
+ + "(2, DATE '2022-03-01', 'Jane')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+ Lists.newArrayList(Row.of(1, dt, "Bill"), Row.of(2, dt, "Jane")));
sql(
"INSERT INTO %s VALUES "
- + "('aaa', DATE '2022-03-01', 4),"
- + "('bbb', DATE '2022-03-01', 5)",
+ + "(1, DATE '2022-03-01', 'Jane'),"
+ + "(2, DATE '2022-03-01', 'Bill')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+ Lists.newArrayList(Row.of(1, dt, "Jane"), Row.of(2, dt, "Bill")));
sql(
"INSERT INTO %s VALUES "
- + "('aaa', DATE '2022-03-01', 6),"
- + "('bbb', DATE '2022-03-01', 7)",
+ + "(3, DATE '2022-03-01', 'Duke'),"
+ + "(4, DATE '2022-03-01', 'Leon')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+ Lists.newArrayList(
+ Row.of(1, dt, "Jane"),
+ Row.of(2, dt, "Bill"),
+ Row.of(3, dt, "Duke"),
+ Row.of(4, dt, "Leon")));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
@@ -298,40 +302,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql(
- "CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) "
- + "PARTITIONED BY (data) WITH %s",
+ "CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
tableName, toWithClause(tableUpsertProps));
sql(
"INSERT INTO %s VALUES "
- + "(1, 'aaa', DATE '2022-03-01'),"
- + "(2, 'aaa', DATE '2022-03-01'),"
- + "(3, 'bbb', DATE '2022-03-01')",
+ + "('Andy', 1, DATE '2022-03-01'),"
+ + "('Bill', 1, DATE '2022-03-01'),"
+ + "('Jane', 2, DATE '2022-03-01')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+ Lists.newArrayList(Row.of("Bill", 1, dt), Row.of("Jane", 2, dt)));
sql(
"INSERT INTO %s VALUES "
- + "(4, 'aaa', DATE '2022-03-01'),"
- + "(5, 'bbb', DATE '2022-03-01')",
+ + "('Jane', 1, DATE '2022-03-01'),"
+ + "('Bill', 2, DATE '2022-03-01')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+ Lists.newArrayList(Row.of("Jane", 1, dt), Row.of("Bill", 2, dt)));
sql(
"INSERT INTO %s VALUES "
- + "(6, 'aaa', DATE '2022-03-01'),"
- + "(7, 'bbb', DATE '2022-03-01')",
+ + "('Duke', 3, DATE '2022-03-01'),"
+ + "('Leon', 4, DATE '2022-03-01')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+ Lists.newArrayList(
+ Row.of("Jane", 1, dt),
+ Row.of("Bill", 2, dt),
+ Row.of("Duke", 3, dt),
+ Row.of("Leon", 4, dt)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index e5cd8741c2..a25ebab6c4 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -127,33 +127,32 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
sql(
- "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) "
- + "PARTITIONED BY (province) WITH %s",
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
tableName, toWithClause(tableUpsertProps));
try {
sql(
"INSERT INTO %s VALUES "
- + "(1, 'a', DATE '2022-03-01'),"
- + "(2, 'b', DATE '2022-03-01'),"
- + "(1, 'b', DATE '2022-03-01')",
+ + "(1, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-01'),"
+ + "(2, 'Jane', DATE '2022-03-01')",
tableName);
sql(
"INSERT INTO %s VALUES "
- + "(4, 'a', DATE '2022-03-02'),"
- + "(5, 'b', DATE '2022-03-02'),"
- + "(1, 'b', DATE '2022-03-02')",
+ + "(2, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-02'),"
+ + "(2, 'Jane', DATE '2022-03-02')",
tableName);
List<Row> rowsOn20220301 =
- Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+ Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
List<Row> rowsOn20220302 =
- Lists.newArrayList(
- Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+ Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
@@ -174,33 +173,32 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
Map<String, String> optionsUpsertProps = Maps.newHashMap(tableUpsertProps);
optionsUpsertProps.remove(TableProperties.UPSERT_ENABLED);
sql(
- "CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) "
- + "PARTITIONED BY (province) WITH %s",
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
tableName, toWithClause(optionsUpsertProps));
try {
sql(
"INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES "
- + "(1, 'a', DATE '2022-03-01'),"
- + "(2, 'b', DATE '2022-03-01'),"
- + "(1, 'b', DATE '2022-03-01')",
+ + "(1, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-01'),"
+ + "(2, 'Jane', DATE '2022-03-01')",
tableName);
sql(
"INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES "
- + "(4, 'a', DATE '2022-03-02'),"
- + "(5, 'b', DATE '2022-03-02'),"
- + "(1, 'b', DATE '2022-03-02')",
+ + "(2, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-02'),"
+ + "(2, 'Jane', DATE '2022-03-02')",
tableName);
List<Row> rowsOn20220301 =
- Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+ Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
List<Row> rowsOn20220302 =
- Lists.newArrayList(
- Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+ Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
@@ -215,30 +213,31 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
@Test
public void testPrimaryKeyEqualToPartitionKey() {
// This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
- String tableName = "upsert_on_data_key";
+ String tableName = "upsert_on_id_key";
try {
sql(
- "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) "
- + "PARTITIONED BY (data) WITH %s",
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY KEY(id) NOT ENFORCED) "
+ + "PARTITIONED BY (id) WITH %s",
tableName, toWithClause(tableUpsertProps));
- sql("INSERT INTO %s VALUES " + "(1, 'aaa')," + "(2, 'aaa')," + "(3, 'bbb')", tableName);
+ sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(1, 'Jane')," + "(2, 'Bill')", tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+ Lists.newArrayList(Row.of(1, "Jane"), Row.of(2, "Bill")));
- sql("INSERT INTO %s VALUES " + "(4, 'aaa')," + "(5, 'bbb')", tableName);
+ sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(2, 'Jane')", tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+ Lists.newArrayList(Row.of(1, "Bill"), Row.of(2, "Jane")));
- sql("INSERT INTO %s VALUES " + "(6, 'aaa')," + "(7, 'bbb')", tableName);
+ sql("INSERT INTO %s VALUES " + "(3, 'Bill')," + "(4, 'Jane')", tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+ Lists.newArrayList(
+ Row.of(1, "Bill"), Row.of(2, "Jane"), Row.of(3, "Bill"), Row.of(4, "Jane")));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
@@ -250,40 +249,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql(
- "CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) "
- + "PARTITIONED BY (data) WITH %s",
+ "CREATE TABLE %s(id INT, dt DATE NOT NULL, name STRING NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
tableName, toWithClause(tableUpsertProps));
sql(
"INSERT INTO %s VALUES "
- + "('aaa', DATE '2022-03-01', 1),"
- + "('aaa', DATE '2022-03-01', 2),"
- + "('bbb', DATE '2022-03-01', 3)",
+ + "(1, DATE '2022-03-01', 'Andy'),"
+ + "(1, DATE '2022-03-01', 'Bill'),"
+ + "(2, DATE '2022-03-01', 'Jane')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+ Lists.newArrayList(Row.of(1, dt, "Bill"), Row.of(2, dt, "Jane")));
sql(
"INSERT INTO %s VALUES "
- + "('aaa', DATE '2022-03-01', 4),"
- + "('bbb', DATE '2022-03-01', 5)",
+ + "(1, DATE '2022-03-01', 'Jane'),"
+ + "(2, DATE '2022-03-01', 'Bill')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+ Lists.newArrayList(Row.of(1, dt, "Jane"), Row.of(2, dt, "Bill")));
sql(
"INSERT INTO %s VALUES "
- + "('aaa', DATE '2022-03-01', 6),"
- + "('bbb', DATE '2022-03-01', 7)",
+ + "(3, DATE '2022-03-01', 'Duke'),"
+ + "(4, DATE '2022-03-01', 'Leon')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+ Lists.newArrayList(
+ Row.of(1, dt, "Jane"),
+ Row.of(2, dt, "Bill"),
+ Row.of(3, dt, "Duke"),
+ Row.of(4, dt, "Leon")));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
@@ -298,40 +301,44 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql(
- "CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) "
- + "PARTITIONED BY (data) WITH %s",
+ "CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
tableName, toWithClause(tableUpsertProps));
sql(
"INSERT INTO %s VALUES "
- + "(1, 'aaa', DATE '2022-03-01'),"
- + "(2, 'aaa', DATE '2022-03-01'),"
- + "(3, 'bbb', DATE '2022-03-01')",
+ + "('Andy', 1, DATE '2022-03-01'),"
+ + "('Bill', 1, DATE '2022-03-01'),"
+ + "('Jane', 2, DATE '2022-03-01')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+ Lists.newArrayList(Row.of("Bill", 1, dt), Row.of("Jane", 2, dt)));
sql(
"INSERT INTO %s VALUES "
- + "(4, 'aaa', DATE '2022-03-01'),"
- + "(5, 'bbb', DATE '2022-03-01')",
+ + "('Jane', 1, DATE '2022-03-01'),"
+ + "('Bill', 2, DATE '2022-03-01')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+ Lists.newArrayList(Row.of("Jane", 1, dt), Row.of("Bill", 2, dt)));
sql(
"INSERT INTO %s VALUES "
- + "(6, 'aaa', DATE '2022-03-01'),"
- + "(7, 'bbb', DATE '2022-03-01')",
+ + "('Duke', 3, DATE '2022-03-01'),"
+ + "('Leon', 4, DATE '2022-03-01')",
tableName);
TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
- Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+ Lists.newArrayList(
+ Row.of("Jane", 1, dt),
+ Row.of("Bill", 2, dt),
+ Row.of("Duke", 3, dt),
+ Row.of("Leon", 4, dt)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}