You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/10 07:07:33 UTC

[flink] branch master updated: [FLINK-18369] Fix instable TableEnvironmentITCase#testStatementSetWithSameSinkTableNames

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5bf5dc  [FLINK-18369] Fix instable TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
e5bf5dc is described below

commit e5bf5dc8e9c42a9d1e63c4025a71f8bdb29a50dd
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jun 19 13:12:49 2020 +0200

    [FLINK-18369] Fix instable TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
    
    Replaced TestingOverwritableTableSink with UnsafeMemoryAppendTableSink
    as the first uses DataSet#writeAsText. This sink cannot be used twice
    with the same path in a single JobGraph.
---
 .../table/runtime/batch/sql/TableEnvironmentITCase.scala    | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index a490467..7b5057b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -534,7 +534,6 @@ class TableEnvironmentITCase(
   }
 
   @Test
-  @Ignore
   def testStatementSetWithSameSinkTableNames(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env)
@@ -543,15 +542,15 @@ class TableEnvironmentITCase(
     val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val sinkPath = _tempFolder.newFile().getAbsolutePath
-    val configuredSink = new TestingOverwritableTableSink(sinkPath)
+    MemoryTableSourceSinkUtil.clear()
+    val configuredSink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink()
       .configure(Array("d", "e", "f"), Array(INT, LONG, STRING))
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
-    assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty)
+    tEnv.asInstanceOf[TableEnvironmentInternal]
+      .registerTableSinkInternal("MySink", configuredSink)
 
     val stmtSet = tEnv.createStatementSet()
-    stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 2"), true)
-      .addInsertSql("INSERT OVERWRITE MySink SELECT a, b, c FROM MyTable where a <= 2")
+    stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 2"))
+      .addInsertSql("INSERT INTO MySink SELECT a, b, c FROM MyTable where a <= 2")
 
     val tableResult = stmtSet.execute()
     // wait job finished