You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/10 07:07:33 UTC
[flink] branch master updated: [FLINK-18369] Fix instable
TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e5bf5dc [FLINK-18369] Fix instable TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
e5bf5dc is described below
commit e5bf5dc8e9c42a9d1e63c4025a71f8bdb29a50dd
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jun 19 13:12:49 2020 +0200
[FLINK-18369] Fix instable TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
Replaced TestingOverwritableTableSink with UnsafeMemoryAppendTableSink
as the first uses DataSet#writeAsText. This sink cannot be used twice
with the same path in a single JobGraph.
---
.../table/runtime/batch/sql/TableEnvironmentITCase.scala | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index a490467..7b5057b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -534,7 +534,6 @@ class TableEnvironmentITCase(
}
@Test
- @Ignore
def testStatementSetWithSameSinkTableNames(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
@@ -543,15 +542,15 @@ class TableEnvironmentITCase(
val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val sinkPath = _tempFolder.newFile().getAbsolutePath
- val configuredSink = new TestingOverwritableTableSink(sinkPath)
+ MemoryTableSourceSinkUtil.clear()
+ val configuredSink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink()
.configure(Array("d", "e", "f"), Array(INT, LONG, STRING))
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
- assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty)
+ tEnv.asInstanceOf[TableEnvironmentInternal]
+ .registerTableSinkInternal("MySink", configuredSink)
val stmtSet = tEnv.createStatementSet()
- stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 2"), true)
- .addInsertSql("INSERT OVERWRITE MySink SELECT a, b, c FROM MyTable where a <= 2")
+ stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 2"))
+ .addInsertSql("INSERT INTO MySink SELECT a, b, c FROM MyTable where a <= 2")
val tableResult = stmtSet.execute()
// wait job finished