You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/23 16:31:40 UTC

[flink] 03/08: [hotfix][table] Code cleanup: use new methods introduced in FLIP-84 instead of deprecated methods

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc886eb9ee7308618353ae6fd688976abc6bd843
Author: godfreyhe <go...@163.com>
AuthorDate: Tue Jun 9 17:27:14 2020 +0800

    [hotfix][table] Code cleanup: use new methods introduced in FLIP-84 instead of deprecated methods
---
 .../jdbc/table/JdbcLookupTableITCase.java          |  2 +-
 .../flink/sql/tests/BatchSQLTestProgram.java       |  2 +-
 .../flink/table/api/TableEnvironmentTest.scala     |  5 +-
 .../validation/LegacyTableSinkValidationTest.scala |  3 +-
 .../planner/runtime/FileSystemITCaseBase.scala     |  4 +-
 .../runtime/batch/table/TableSinkITCase.scala      | 28 +++----
 .../runtime/stream/table/TableSinkITCase.scala     | 86 ++++++++--------------
 7 files changed, 45 insertions(+), 85 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
index 8babd64..3e3a449 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
@@ -144,7 +144,7 @@ public class JdbcLookupTableITCase extends JdbcLookupTestBase {
 		tEnv.createTemporaryView("T", t);
 
 		String cacheConfig = ", 'lookup.cache.max-rows'='4', 'lookup.cache.ttl'='10000', 'lookup.max-retries'='5'";
-		tEnv.sqlUpdate(
+		tEnv.executeSql(
 			String.format("create table lookup (" +
 				"  id1 INT," +
 				"  id2 VARCHAR," +
diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
index 1c9d92f..9698b48 100644
--- a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -48,7 +48,7 @@ import java.util.NoSuchElementException;
  *
  * <p>Parameters:
  * -outputPath output file path for CsvTableSink;
- * -sqlStatement SQL statement that will be executed as sqlUpdate
+ * -sqlStatement SQL statement that will be executed as executeSql
  */
 public class BatchSQLTestProgram {
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index b2073dc..f7731f1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -99,11 +99,8 @@ class TableEnvironmentTest {
     TestTableSourceSinks.createCsvTemporarySinkTable(
       tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
 
-    val table1 = tEnv.sqlQuery("select first from MyTable")
-    tEnv.insertInto(table1, "MySink")
-
     val expected = TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExplain.out")
-    val actual = tEnv.explain(false)
+    val actual = tEnv.explainSql("insert into MySink select first from MyTable")
     assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
index ce0293c..e17bdb5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
@@ -65,9 +65,8 @@ class LegacyTableSinkValidationTest extends TableTestBase {
     val schema = result.getSchema
     sink.configure(schema.getFieldNames, schema.getFieldTypes)
     tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("testSink", sink)
-    tEnv.insertInto("testSink", result)
     // must fail because table is updating table without full key
-    env.execute()
+    result.executeInsert("testSink")
   }
 
   @Test(expected = classOf[TableException])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index 28af0e9..b41aaf8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -243,8 +243,8 @@ trait FileSystemITCaseBase {
 
   @Test
   def testProjectPushDown(): Unit = {
-    tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
-    tableEnv.execute("test")
+    execInsertSqlAndWaitResult(
+      tableEnv, "insert into partitionedTable select x, y, a, b from originalT")
 
     check(
       "select y, b, x from partitionedTable where a=3",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
index f6aaf0f..9082c8b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
@@ -48,12 +48,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", data3, type3, "a, b, c", nullablesOfData3)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -76,12 +74,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", data3, type3, "a, b, c", nullablesOfData3)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -104,11 +100,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", simpleData2, simpleType2, "a, b", nullableOfSimpleData2)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .groupBy('a)
       .select('a, 'b.sum())
-      .insertInto("testSink")
-    tEnv.execute("")
+    execInsertTableAndWaitResult(table, "testSink")
 
     val result = TestValuesTableFactory.getResults("testSink")
     val expected = List(
@@ -135,11 +130,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", simpleData2, simpleType2, "a, b", nullableOfSimpleData2)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .groupBy('a)
       .select('a, 'b.sum())
-      .insertInto("testSink")
-    tEnv.execute("")
+    execInsertTableAndWaitResult(table, "testSink")
 
     val result = TestValuesTableFactory.getResults("testSink")
     val expected = List(
@@ -177,11 +171,10 @@ class TableSinkITCase extends BatchTestBase {
          |  'sink-insert-only' = 'true'
          |)
          |""".stripMargin)
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     // default should fail, because there are null values in the source
     try {
-      tEnv.execute("job name")
+      execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
       fail("Execution should fail.")
     } catch {
       case t: Throwable =>
@@ -195,8 +188,7 @@ class TableSinkITCase extends BatchTestBase {
 
     // enable drop enforcer to make the query can run
     tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
-    tEnv.execute("job name")
+    execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     val result = TestValuesTableFactory.getResults("not_null_sink")
     val expected = List("book,1,12", "book,4,11", "fruit,3,44")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index c4cbae2..5c1c245 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -55,12 +55,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w)
       .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
-      .insertInto("appendSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "appendSink")
 
     val result = TestValuesTableFactory.getResults("appendSink")
     val expected = List(
@@ -88,8 +86,7 @@ class TableSinkITCase extends StreamingTestBase {
          |  'sink-insert-only' = 'true'
          |)
          |""".stripMargin)
-    tEnv.sqlUpdate("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
-    tEnv.execute("job name")
+    execInsertSqlAndWaitResult("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
 
     val result = TestValuesTableFactory.getResults("appendSink")
     val expected = List(
@@ -115,11 +112,9 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    ds1.join(ds2).where('b === 'e)
+    val table = ds1.join(ds2).where('b === 'e)
       .select('c, 'g)
-      .insertInto("appendSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "appendSink")
 
     val result = TestValuesTableFactory.getResults("appendSink")
     val expected = List("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt")
@@ -144,12 +139,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.select('id, 'num, 'text.charLength() as 'len)
+    val table = t.select('id, 'num, 'text.charLength() as 'len)
       .groupBy('len)
       .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
-      .insertInto("retractSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "retractSink")
 
     val result = TestValuesTableFactory.getResults("retractSink")
     val expected = List(
@@ -177,12 +170,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w)
       .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
-      .insertInto("retractSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "retractSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("retractSink")
     assertFalse(
@@ -218,15 +209,13 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+    val table = t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
       .groupBy('len, 'cTrue)
       // test query field name is different with registered sink field name
       .select('len, 'id.count as 'count, 'cTrue)
       .groupBy('count, 'cTrue)
       .select('count, 'len.count as 'lencnt, 'cTrue)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertTrue(
@@ -257,13 +246,11 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w, 'num)
       // test query field name is different with registered sink field name
       .select('num, 'w.end as 'window_end, 'id.count as 'icnt)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertFalse(
@@ -303,12 +290,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w, 'num)
       .select('w.end as 'wend, 'id.count as 'cnt)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertFalse(
@@ -347,12 +332,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w, 'num)
       .select('num, 'id.count as 'cnt)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertFalse(
@@ -399,12 +382,10 @@ class TableSinkITCase extends StreamingTestBase {
     //   5, 5
     //   6, 6
 
-    t.groupBy('num)
+    val table = t.groupBy('num)
       .select('num, 'id.count as 'cnt)
       .where('cnt <= 3)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val result = TestValuesTableFactory.getResults("upsertSink")
     val expected = List("1,1", "2,2", "3,3")
@@ -429,15 +410,14 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.milli on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.milli on 'rowtime as 'w)
       .groupBy('num, 'w)
       .select('num, 'w.rowtime as 'rowtime1, 'w.rowtime as 'rowtime2)
-      .insertInto("sink")
 
     thrown.expect(classOf[TableException])
     thrown.expectMessage("Found more than one rowtime field: [rowtime1, rowtime2] " +
       "in the query when insert into 'default_catalog.default_database.sink'")
-    tEnv.execute("job name")
+    table.executeInsert("sink")
   }
 
   @Test
@@ -454,13 +434,11 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    env.fromCollection(tupleData3)
+    val table = env.fromCollection(tupleData3)
       .toTable(tEnv, 'a, 'b, 'c)
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -482,13 +460,11 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    env.fromCollection(tupleData3)
+    val table = env.fromCollection(tupleData3)
       .toTable(tEnv, 'a, 'b, 'c)
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -532,14 +508,13 @@ class TableSinkITCase extends StreamingTestBase {
         |  'sink-insert-only' = 'false'
         |)
         |""".stripMargin)
-    tEnv.sqlUpdate(
+    execInsertSqlAndWaitResult(
       """
         |INSERT INTO changelog_sink
         |SELECT product_id, user_name, SUM(order_price)
         |FROM orders
         |GROUP BY product_id, user_name
         |""".stripMargin)
-    tEnv.execute("job name")
 
     val rawResult = TestValuesTableFactory.getRawResults("changelog_sink")
     val expected = List(
@@ -584,14 +559,13 @@ class TableSinkITCase extends StreamingTestBase {
         |  'sink-insert-only' = 'false'
         |)
         |""".stripMargin)
-    tEnv.sqlUpdate(
+    execInsertSqlAndWaitResult(
       """
         |INSERT INTO final_sink
         |SELECT user_name, SUM(price) as total_pay
         |FROM changelog_source
         |GROUP BY user_name
         |""".stripMargin)
-    tEnv.execute("job name")
     val finalResult = TestValuesTableFactory.getResults("final_sink")
     val finalExpected = List(
       "user1,28.12", "user2,71.20", "user3,32.33", "user4,9.99")
@@ -623,11 +597,10 @@ class TableSinkITCase extends StreamingTestBase {
          |  'sink-insert-only' = 'true'
          |)
          |""".stripMargin)
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     // default should fail, because there are null values in the source
     try {
-      tEnv.execute("job name")
+      execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
       fail("Execution should fail.")
     } catch {
       case t: Throwable =>
@@ -641,8 +614,7 @@ class TableSinkITCase extends StreamingTestBase {
 
     // enable drop enforcer to make the query can run
     tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
-    tEnv.execute("job name")
+    execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     val result = TestValuesTableFactory.getResults("not_null_sink")
     val expected = List("book,1,12", "book,4,11", "fruit,3,44")