You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/23 16:31:40 UTC
[flink] 03/08: [hotfix][table] Code cleanup: use new methods
introduced in FLIP-84 instead of deprecated methods
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc886eb9ee7308618353ae6fd688976abc6bd843
Author: godfreyhe <go...@163.com>
AuthorDate: Tue Jun 9 17:27:14 2020 +0800
[hotfix][table] Code cleanup: use new methods introduced in FLIP-84 instead of deprecated methods
---
.../jdbc/table/JdbcLookupTableITCase.java | 2 +-
.../flink/sql/tests/BatchSQLTestProgram.java | 2 +-
.../flink/table/api/TableEnvironmentTest.scala | 5 +-
.../validation/LegacyTableSinkValidationTest.scala | 3 +-
.../planner/runtime/FileSystemITCaseBase.scala | 4 +-
.../runtime/batch/table/TableSinkITCase.scala | 28 +++----
.../runtime/stream/table/TableSinkITCase.scala | 86 ++++++++--------------
7 files changed, 45 insertions(+), 85 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
index 8babd64..3e3a449 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
@@ -144,7 +144,7 @@ public class JdbcLookupTableITCase extends JdbcLookupTestBase {
tEnv.createTemporaryView("T", t);
String cacheConfig = ", 'lookup.cache.max-rows'='4', 'lookup.cache.ttl'='10000', 'lookup.max-retries'='5'";
- tEnv.sqlUpdate(
+ tEnv.executeSql(
String.format("create table lookup (" +
" id1 INT," +
" id2 VARCHAR," +
diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
index 1c9d92f..9698b48 100644
--- a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -48,7 +48,7 @@ import java.util.NoSuchElementException;
*
* <p>Parameters:
* -outputPath output file path for CsvTableSink;
- * -sqlStatement SQL statement that will be executed as sqlUpdate
+ * -sqlStatement SQL statement that will be executed as executeSql
*/
public class BatchSQLTestProgram {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index b2073dc..f7731f1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -99,11 +99,8 @@ class TableEnvironmentTest {
TestTableSourceSinks.createCsvTemporarySinkTable(
tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
- val table1 = tEnv.sqlQuery("select first from MyTable")
- tEnv.insertInto(table1, "MySink")
-
val expected = TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExplain.out")
- val actual = tEnv.explain(false)
+ val actual = tEnv.explainSql("insert into MySink select first from MyTable")
assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual))
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
index ce0293c..e17bdb5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
@@ -65,9 +65,8 @@ class LegacyTableSinkValidationTest extends TableTestBase {
val schema = result.getSchema
sink.configure(schema.getFieldNames, schema.getFieldTypes)
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("testSink", sink)
- tEnv.insertInto("testSink", result)
// must fail because table is updating table without full key
- env.execute()
+ result.executeInsert("testSink")
}
@Test(expected = classOf[TableException])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index 28af0e9..b41aaf8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -243,8 +243,8 @@ trait FileSystemITCaseBase {
@Test
def testProjectPushDown(): Unit = {
- tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
- tableEnv.execute("test")
+ execInsertSqlAndWaitResult(
+ tableEnv, "insert into partitionedTable select x, y, a, b from originalT")
check(
"select y, b, x from partitionedTable where a=3",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
index f6aaf0f..9082c8b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
@@ -48,12 +48,10 @@ class TableSinkITCase extends BatchTestBase {
registerCollection("MyTable", data3, type3, "a, b, c", nullablesOfData3)
- tEnv.from("MyTable")
+ val table = tEnv.from("MyTable")
.where('a > 20)
.select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
- .insertInto("sink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "sink")
val result = TestValuesTableFactory.getResults("sink")
val expected = Seq("12345,55,12345")
@@ -76,12 +74,10 @@ class TableSinkITCase extends BatchTestBase {
registerCollection("MyTable", data3, type3, "a, b, c", nullablesOfData3)
- tEnv.from("MyTable")
+ val table = tEnv.from("MyTable")
.where('a > 20)
.select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
- .insertInto("sink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "sink")
val result = TestValuesTableFactory.getResults("sink")
val expected = Seq("12345,55,12345")
@@ -104,11 +100,10 @@ class TableSinkITCase extends BatchTestBase {
registerCollection("MyTable", simpleData2, simpleType2, "a, b", nullableOfSimpleData2)
- tEnv.from("MyTable")
+ val table = tEnv.from("MyTable")
.groupBy('a)
.select('a, 'b.sum())
- .insertInto("testSink")
- tEnv.execute("")
+ execInsertTableAndWaitResult(table, "testSink")
val result = TestValuesTableFactory.getResults("testSink")
val expected = List(
@@ -135,11 +130,10 @@ class TableSinkITCase extends BatchTestBase {
registerCollection("MyTable", simpleData2, simpleType2, "a, b", nullableOfSimpleData2)
- tEnv.from("MyTable")
+ val table = tEnv.from("MyTable")
.groupBy('a)
.select('a, 'b.sum())
- .insertInto("testSink")
- tEnv.execute("")
+ execInsertTableAndWaitResult(table, "testSink")
val result = TestValuesTableFactory.getResults("testSink")
val expected = List(
@@ -177,11 +171,10 @@ class TableSinkITCase extends BatchTestBase {
| 'sink-insert-only' = 'true'
|)
|""".stripMargin)
- tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
// default should fail, because there are null values in the source
try {
- tEnv.execute("job name")
+ execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
fail("Execution should fail.")
} catch {
case t: Throwable =>
@@ -195,8 +188,7 @@ class TableSinkITCase extends BatchTestBase {
// enable drop enforcer to make the query can run
tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
- tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
- tEnv.execute("job name")
+ execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
val result = TestValuesTableFactory.getResults("not_null_sink")
val expected = List("book,1,12", "book,4,11", "fruit,3,44")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index c4cbae2..5c1c245 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -55,12 +55,10 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.window(Tumble over 5.millis on 'rowtime as 'w)
+ val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
.select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
- .insertInto("appendSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "appendSink")
val result = TestValuesTableFactory.getResults("appendSink")
val expected = List(
@@ -88,8 +86,7 @@ class TableSinkITCase extends StreamingTestBase {
| 'sink-insert-only' = 'true'
|)
|""".stripMargin)
- tEnv.sqlUpdate("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
- tEnv.execute("job name")
+ execInsertSqlAndWaitResult("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
val result = TestValuesTableFactory.getResults("appendSink")
val expected = List(
@@ -115,11 +112,9 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- ds1.join(ds2).where('b === 'e)
+ val table = ds1.join(ds2).where('b === 'e)
.select('c, 'g)
- .insertInto("appendSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "appendSink")
val result = TestValuesTableFactory.getResults("appendSink")
val expected = List("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt")
@@ -144,12 +139,10 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.select('id, 'num, 'text.charLength() as 'len)
+ val table = t.select('id, 'num, 'text.charLength() as 'len)
.groupBy('len)
.select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
- .insertInto("retractSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "retractSink")
val result = TestValuesTableFactory.getResults("retractSink")
val expected = List(
@@ -177,12 +170,10 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.window(Tumble over 5.millis on 'rowtime as 'w)
+ val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
.select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
- .insertInto("retractSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "retractSink")
val rawResult = TestValuesTableFactory.getRawResults("retractSink")
assertFalse(
@@ -218,15 +209,13 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+ val table = t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
.groupBy('len, 'cTrue)
// test query field name is different with registered sink field name
.select('len, 'id.count as 'count, 'cTrue)
.groupBy('count, 'cTrue)
.select('count, 'len.count as 'lencnt, 'cTrue)
- .insertInto("upsertSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "upsertSink")
val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
assertTrue(
@@ -257,13 +246,11 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.window(Tumble over 5.millis on 'rowtime as 'w)
+ val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'num)
// test query field name is different with registered sink field name
.select('num, 'w.end as 'window_end, 'id.count as 'icnt)
- .insertInto("upsertSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "upsertSink")
val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
assertFalse(
@@ -303,12 +290,10 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.window(Tumble over 5.millis on 'rowtime as 'w)
+ val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'num)
.select('w.end as 'wend, 'id.count as 'cnt)
- .insertInto("upsertSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "upsertSink")
val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
assertFalse(
@@ -347,12 +332,10 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.window(Tumble over 5.millis on 'rowtime as 'w)
+ val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'num)
.select('num, 'id.count as 'cnt)
- .insertInto("upsertSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "upsertSink")
val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
assertFalse(
@@ -399,12 +382,10 @@ class TableSinkITCase extends StreamingTestBase {
// 5, 5
// 6, 6
- t.groupBy('num)
+ val table = t.groupBy('num)
.select('num, 'id.count as 'cnt)
.where('cnt <= 3)
- .insertInto("upsertSink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "upsertSink")
val result = TestValuesTableFactory.getResults("upsertSink")
val expected = List("1,1", "2,2", "3,3")
@@ -429,15 +410,14 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- t.window(Tumble over 5.milli on 'rowtime as 'w)
+ val table = t.window(Tumble over 5.milli on 'rowtime as 'w)
.groupBy('num, 'w)
.select('num, 'w.rowtime as 'rowtime1, 'w.rowtime as 'rowtime2)
- .insertInto("sink")
thrown.expect(classOf[TableException])
thrown.expectMessage("Found more than one rowtime field: [rowtime1, rowtime2] " +
"in the query when insert into 'default_catalog.default_database.sink'")
- tEnv.execute("job name")
+ table.executeInsert("sink")
}
@Test
@@ -454,13 +434,11 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- env.fromCollection(tupleData3)
+ val table = env.fromCollection(tupleData3)
.toTable(tEnv, 'a, 'b, 'c)
.where('a > 20)
.select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
- .insertInto("sink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "sink")
val result = TestValuesTableFactory.getResults("sink")
val expected = Seq("12345,55,12345")
@@ -482,13 +460,11 @@ class TableSinkITCase extends StreamingTestBase {
|)
|""".stripMargin)
- env.fromCollection(tupleData3)
+ val table = env.fromCollection(tupleData3)
.toTable(tEnv, 'a, 'b, 'c)
.where('a > 20)
.select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
- .insertInto("sink")
-
- tEnv.execute("job name")
+ execInsertTableAndWaitResult(table, "sink")
val result = TestValuesTableFactory.getResults("sink")
val expected = Seq("12345,55,12345")
@@ -532,14 +508,13 @@ class TableSinkITCase extends StreamingTestBase {
| 'sink-insert-only' = 'false'
|)
|""".stripMargin)
- tEnv.sqlUpdate(
+ execInsertSqlAndWaitResult(
"""
|INSERT INTO changelog_sink
|SELECT product_id, user_name, SUM(order_price)
|FROM orders
|GROUP BY product_id, user_name
|""".stripMargin)
- tEnv.execute("job name")
val rawResult = TestValuesTableFactory.getRawResults("changelog_sink")
val expected = List(
@@ -584,14 +559,13 @@ class TableSinkITCase extends StreamingTestBase {
| 'sink-insert-only' = 'false'
|)
|""".stripMargin)
- tEnv.sqlUpdate(
+ execInsertSqlAndWaitResult(
"""
|INSERT INTO final_sink
|SELECT user_name, SUM(price) as total_pay
|FROM changelog_source
|GROUP BY user_name
|""".stripMargin)
- tEnv.execute("job name")
val finalResult = TestValuesTableFactory.getResults("final_sink")
val finalExpected = List(
"user1,28.12", "user2,71.20", "user3,32.33", "user4,9.99")
@@ -623,11 +597,10 @@ class TableSinkITCase extends StreamingTestBase {
| 'sink-insert-only' = 'true'
|)
|""".stripMargin)
- tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
// default should fail, because there are null values in the source
try {
- tEnv.execute("job name")
+ execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
fail("Execution should fail.")
} catch {
case t: Throwable =>
@@ -641,8 +614,7 @@ class TableSinkITCase extends StreamingTestBase {
// enable drop enforcer to make the query can run
tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
- tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
- tEnv.execute("job name")
+ execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
val result = TestValuesTableFactory.getResults("not_null_sink")
val expected = List("book,1,12", "book,4,11", "fruit,3,44")