You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/20 08:13:17 UTC
[2/4] flink git commit: [FLINK-6442] [table] Add registration for
TableSinks and INSERT INTO support for SQL and Table API.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
index d3f9b9f..cfc8067 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
@@ -34,6 +34,6 @@ class SortValidationTest extends TableTestBase {
val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2cfe931
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because TableSink accepts fewer fields.
+ util.tableEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because types of result and TableSink do not match.
+ util.tableEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 0943ea6..1b99679 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -41,7 +41,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
- val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+ val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
val expected = unaryNode(
"DataStreamCalc",
@@ -53,7 +53,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
- val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table2 " +
+ val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table2 " +
s"UNION ALL SELECT a, b, c FROM $table")
val expected2 = binaryNode(
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 640fd26..e066fe4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -206,7 +206,7 @@ class JoinTest extends TableTestBase {
val query =
"SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
- val resultTable = streamUtil.tableEnv.sql(query)
+ val resultTable = streamUtil.tableEnv.sqlQuery(query)
val relNode = resultTable.getRelNode
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val rexNode = joinNode.getCondition
@@ -230,7 +230,7 @@ class JoinTest extends TableTestBase {
query: String,
expectCondStr: String): Unit = {
- val resultTable = streamUtil.tableEnv.sql(query)
+ val resultTable = streamUtil.tableEnv.sqlQuery(query)
val relNode = resultTable.getRelNode
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val joinInfo = joinNode.analyzeCondition
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..3045100
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because table sink has too few fields.
+ tEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because field types of table sink are incompatible.
+ tEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnsupportedPartialInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+ // must fail because we don't support partial insert yet.
+ tEnv.sqlUpdate(sql)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
index 413cca7..d04b6d0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
@@ -43,7 +43,7 @@ class OverWindowValidationTest extends TableTestBase {
"sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row]
+ streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
}
/**
@@ -55,7 +55,7 @@ class OverWindowValidationTest extends TableTestBase {
val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
- streamUtil.tableEnv.sql(sqlQuery)
+ streamUtil.tableEnv.sqlQuery(sqlQuery)
}
/**
@@ -66,6 +66,6 @@ class OverWindowValidationTest extends TableTestBase {
streamUtil.addFunction("overAgg", new OverAgg0)
val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
- streamUtil.tableEnv.sql(sqlQuery)
+ streamUtil.tableEnv.sqlQuery(sqlQuery)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index dbc7d46..f58feed 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -145,7 +145,7 @@ class CorrelateValidationTest extends TableTestBase {
), "Undefined function: NONEXIST")
// SQL API call
expectExceptionThrown(
- util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
+ util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
"No match found for function signature nonexist(<NUMERIC>)")
@@ -160,7 +160,7 @@ class CorrelateValidationTest extends TableTestBase {
// SQL API call
// NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
expectExceptionThrown(
- util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
+ util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
null,
classOf[AssertionError])
@@ -172,7 +172,7 @@ class CorrelateValidationTest extends TableTestBase {
"Given parameters of function 'FUNC2' do not match any signature")
// SQL API call
expectExceptionThrown(
- util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
+ util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
"No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2fcfd6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because table sink has too few fields.
+ tEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because field types of table sink are incompatible.
+ tEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
index ab87cd3..628925b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
@@ -18,10 +18,12 @@
package org.apache.flink.table.api.validation
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.stream.table.TestAppendSink
+import org.apache.flink.table.utils.MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
import org.apache.flink.table.utils.TableTestBase
import org.junit.Test
@@ -39,4 +41,27 @@ class TableSinksValidationTest extends TableTestBase {
.writeToSink(new TestAppendSink)
}
+ @Test(expected = classOf[TableException])
+ def testSinkTableRegistrationUsingExistedTableName(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Int, String)]("TargetTable", 'id, 'text)
+
+ val fieldNames = Array("a", "b", "c")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ // table name already registered
+ util.tableEnv
+ .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegistrationWithInconsistentFieldNamesAndTypesLength(): Unit = {
+ val util = streamTestUtil()
+
+ // inconsistent length of field names and types
+ val fieldNames = Array("a", "b", "c")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
+
+ util.tableEnv
+ .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 44842f7..dd6e00e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.Future
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql2rel.RelDecorrelator
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
index a15f1d1..b4ad9ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -430,7 +430,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+ val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
util.tableEnv.registerTable("NewTable", newTable)
@@ -448,7 +448,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+ val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
util.tableEnv.registerTable("NewTable", newTable)
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index 535bbf5..999a808 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -286,7 +286,7 @@ class RetractionRulesTest extends TableTestBase {
class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
def verifySqlTrait(query: String, expected: String): Unit = {
- verifyTableTrait(tableEnv.sql(query), expected)
+ verifyTableTrait(tableEnv.sqlQuery(query), expected)
}
def verifyTableTrait(resultTable: Table, expected: String): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index ab80c65..cfff326 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -275,7 +275,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
- val result = util.tableEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
+ val result = util.tableEnv.sqlQuery("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
val expected = unaryNode(
"DataStreamCalc",
@@ -300,7 +300,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
- val result = util.tableEnv.sql("SELECT MIN(proctime) FROM MyTable GROUP BY long")
+ val result = util.tableEnv.sqlQuery("SELECT MIN(proctime) FROM MyTable GROUP BY long")
val expected = unaryNode(
"DataStreamCalc",
@@ -325,7 +325,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
- val result = util.tableEnv.sql(
+ val result = util.tableEnv.sqlQuery(
"SELECT TUMBLE_END(rowtime, INTERVAL '0.1' SECOND) AS `rowtime`, `long`, " +
"SUM(`int`) FROM MyTable " +
"GROUP BY `long`, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
@@ -355,7 +355,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
- val result = util.tableEnv.sql("SELECT MIN(rowtime), long FROM MyTable " +
+ val result = util.tableEnv.sqlQuery("SELECT MIN(rowtime), long FROM MyTable " +
"GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
val expected = unaryNode(
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index 39b8371..465a88c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -52,7 +52,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231,1,21,21,11"
val results = result.toDataSet[Row].collect()
@@ -70,7 +70,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231"
val results = result.toDataSet[Row].collect()
@@ -88,7 +88,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231"
val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class AggregateITCase(
(2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0"
val results = result.toDataSet[Row].collect()
@@ -128,7 +128,7 @@ class AggregateITCase(
val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,3,2,1,3"
val results = result.toDataSet[Row].collect()
@@ -147,7 +147,7 @@ class AggregateITCase(
val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "5.5,7"
val results = result.toDataSet[Row].collect()
@@ -165,7 +165,7 @@ class AggregateITCase(
val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "2,2"
val results = result.toDataSet[Row].collect()
@@ -187,7 +187,7 @@ class AggregateITCase(
(2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,3,2"
val results = result.toDataSet[Row].collect()
@@ -205,7 +205,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231,21"
val results = result.toDataSet[Row].collect()
@@ -223,7 +223,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected =
"6,18,6\n5,13,5\n4,8,4\n3,5,3\n2,2,2\n1,1,1"
@@ -243,7 +243,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected =
"6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
@@ -280,9 +280,9 @@ class AggregateITCase(
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
- val result2 = tEnv.sql(sqlQuery2)
- val result3 = tEnv.sql(sqlQuery3)
+ val result = tEnv.sqlQuery(sqlQuery)
+ val result2 = tEnv.sqlQuery(sqlQuery2)
+ val result3 = tEnv.sqlQuery(sqlQuery3)
val results = result.toDataSet[Row].collect()
val expected = Seq.empty
@@ -315,7 +315,7 @@ class AggregateITCase(
.map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected = Seq(
"1,1,1,1,1",
"2,2,1,2,2", "2,3,1,2,3",
@@ -348,7 +348,7 @@ class AggregateITCase(
.map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected = Seq(
"1,1,1,1,1","1,1,1,1,1",
"2,5,2,2,2","2,5,2,2,2",
@@ -383,7 +383,7 @@ class AggregateITCase(
.map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected = Seq(
"2,10,39,6,3,7",
"16,21,111,6,6,18"
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 711182c..b891a7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -53,7 +53,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -77,7 +77,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -101,7 +101,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -125,7 +125,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
@@ -146,7 +146,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- tEnv.sql(sqlQuery)
+ tEnv.sqlQuery(sqlQuery)
}
@Test
@@ -160,7 +160,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "\n"
val results = result.toDataSet[Row].collect()
@@ -178,7 +178,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
@@ -201,7 +201,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
val results = result.toDataSet[Row].collect()
@@ -219,7 +219,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
"6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
@@ -240,7 +240,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
val results = result.toDataSet[Row].collect()
@@ -258,7 +258,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
"9,4,Comment#3\n" + "17,6,Comment#11\n" +
@@ -281,7 +281,7 @@ class CalcITCase(
Timestamp.valueOf("1984-07-12 14:34:24")))
tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
"1984-07-12,14:34:24,1984-07-12 14:34:24.0"
@@ -300,7 +300,7 @@ class CalcITCase(
val ds = env.fromElements("a", "b", "c")
tEnv.registerDataSet("MyTable", ds, 'text)
- val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
+ val result = tEnv.sqlQuery("SELECT hashCode(text) FROM MyTable")
val expected = "97\n98\n99"
val results = result.toDataSet[Row].collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
index 681b4b5..6a17cb4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -49,7 +49,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
val results = result.toDataSet[Row].collect()
@@ -69,7 +69,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi,Hallo\n"
val results = result.toDataSet[Row].collect()
@@ -89,7 +89,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
@@ -130,7 +130,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
"2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -150,7 +150,7 @@ class JoinITCase(
tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "6,6"
val results = result.toDataSet[Row].collect()
@@ -170,7 +170,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "6,6"
val results = result.toDataSet[Row].collect()
@@ -196,7 +196,7 @@ class JoinITCase(
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
"null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -218,7 +218,7 @@ class JoinITCase(
"null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
"null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -240,7 +240,7 @@ class JoinITCase(
"null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
"null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -257,7 +257,7 @@ class JoinITCase(
"3,1,1,Hi\n" +
"3,2,2,Hello\n" +
"3,3,2,Hello world"
- val result = tEnv.sql(sqlQuery2).collect()
+ val result = tEnv.sqlQuery(sqlQuery2).collect()
TestBaseUtils.compareResultAsText(result.asJava, expected)
}
@@ -274,7 +274,7 @@ class JoinITCase(
"1,1,Hi,3\n" +
"2,2,Hello,3\n" +
"3,2,Hello world,3"
- val result = tEnv.sql(sqlQuery1).collect()
+ val result = tEnv.sqlQuery(sqlQuery1).collect()
TestBaseUtils.compareResultAsText(result.asJava, expected)
}
@@ -287,7 +287,7 @@ class JoinITCase(
tEnv.registerTable("A", table)
val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
- val result = tEnv.sql(sqlQuery1).count()
+ val result = tEnv.sqlQuery(sqlQuery1).count()
Assert.assertEquals(0, result)
}
@@ -305,7 +305,7 @@ class JoinITCase(
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null",
"2,null", "2,null",
@@ -331,7 +331,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null", "2,null", "2,null", "3,3", "3,3",
"3,3", "4,null", "4,null", "4,null",
@@ -355,7 +355,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,3", "2,3", "2,3", "3,null", "3,null",
"3,null", "4,null", "4,null", "4,null",
@@ -380,7 +380,7 @@ class JoinITCase(
tEnv.registerTable("A", ds2)
tEnv.registerTable("B", ds1)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"2,null", "3,null", "1,null").mkString("\n")
@@ -402,7 +402,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null", "2,null", "2,null", "3,3", "3,3",
@@ -427,7 +427,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null", "2,null", "2,null", "3,null", "3,null",
@@ -453,7 +453,7 @@ class JoinITCase(
tEnv.registerTable("t1", ds1)
tEnv.registerTable("t2", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null,null",
"2,null,null", "2,null,null",
@@ -481,7 +481,7 @@ class JoinITCase(
val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
val results = result.toDataSet[Row].collect().toList
@@ -508,7 +508,7 @@ class JoinITCase(
" UNNEST(tf.b) as A (x, y) " +
"WHERE x > a"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = List(
"1,[(12,45.6), (2,45.612)],12,45.6",
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
index b0e6fe8..d965e0c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
@@ -52,7 +52,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -72,7 +72,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -94,7 +94,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hallo\n"
val results = result.toDataSet[Row].collect()
@@ -115,7 +115,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "18"
val results = result.toDataSet[Row].collect()
@@ -135,7 +135,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -161,7 +161,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'c)
tEnv.registerDataSet("t2", ds2, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1\n1"
val results = result.toDataSet[Row].collect()
@@ -183,7 +183,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n"
val results = result.toDataSet[Row].collect()
@@ -208,7 +208,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hello\n"
val results = result.toDataSet[Row].collect()
@@ -234,7 +234,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'c)
tEnv.registerDataSet("t2", ds2, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1\n2\n2"
val results = result.toDataSet[Row].collect()
@@ -254,7 +254,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -271,7 +271,7 @@ class SetOperatorsITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
+ val result = tEnv.sqlQuery("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
val expected = Seq("1", "2", "2", "3", "3", "3").mkString("\n")
val results = result.toDataSet[Row].collect()
@@ -288,7 +288,7 @@ class SetOperatorsITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql("SELECT d IN (SELECT a FROM Table3) FROM Table5")
+ val result = tEnv.sqlQuery("SELECT d IN (SELECT a FROM Table3) FROM Table5")
val expected = Seq("false", "false", "false", "false", "false", "false", "false",
"false", "false", "true", "true", "true", "true", "true", "true").mkString("\n")
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
index 4672ec3..66943fc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
@@ -62,7 +62,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
@@ -99,7 +99,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
@@ -130,7 +130,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
@@ -161,7 +161,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index b7f1bb1..be8278f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -24,8 +24,10 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.MemoryTableSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -48,7 +50,7 @@ class TableEnvironmentITCase(
val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
- val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+ val result = tEnv.sqlQuery(sqlQuery).select('a.avg, 'b.sum, 'c.count)
val expected = "15,65,12"
val results = result.toDataSet[Row].collect()
@@ -68,7 +70,7 @@ class TableEnvironmentITCase(
val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
- val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+ val result = tEnv.sqlQuery(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
val expected = "16,60,12"
val results = result.toDataSet[Row].collect()
@@ -85,11 +87,11 @@ class TableEnvironmentITCase(
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
- val result1 = tEnv.sql(sqlQuery)
+ val result1 = tEnv.sqlQuery(sqlQuery)
tEnv.registerTable("ResTable", result1)
val sqlQuery2 = "SELECT count(aa) FROM ResTable"
- val result2 = tEnv.sql(sqlQuery2)
+ val result2 = tEnv.sqlQuery(sqlQuery2)
val expected = "6"
val results = result2.toDataSet[Row].collect()
@@ -106,11 +108,34 @@ class TableEnvironmentITCase(
val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello,true\n"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test
+ def testInsertIntoMemoryTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+
+ val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+ tEnv.sqlUpdate(sql)
+ env.execute()
+
+ val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 504ab90..187d096 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -44,7 +44,7 @@ class TableSourceITCase(
val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.registerTableSource("csvTable", csvTable)
- val results = tEnv.sql(
+ val results = tEnv.sqlQuery(
"SELECT id, `first`, `last`, score FROM csvTable").collect()
val expected = Seq(
@@ -67,7 +67,7 @@ class TableSourceITCase(
tableEnv.registerTableSource("NestedPersons", nestedTable)
- val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," +
+ val result = tableEnv.sqlQuery("SELECT NestedPersons.firstName, NestedPersons.lastName," +
"NestedPersons.address.street, NestedPersons.address.city AS city " +
"FROM NestedPersons " +
"WHERE NestedPersons.address.city LIKE 'Dublin'").collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 116f690..e947c3f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -446,7 +446,7 @@ class CalcITCase(
val sqlQuery = "SELECT c FROM t1 where RichFunc2(c)='ABC#Hello'"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello"
val results = result.toDataSet[Row].collect()
@@ -467,7 +467,7 @@ class CalcITCase(
val sqlQuery = "SELECT c FROM t1 where RichFunc3(c)=true"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello"
val results = result.toDataSet[Row].collect()
@@ -488,7 +488,7 @@ class CalcITCase(
val sqlQuery = "SELECT c FROM t1 where " +
"RichFunc2(c)='Abc#Hello' or RichFunc1(a)=3 and b=2"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello\nHello world"
val results = result.toDataSet[Row].collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 2e23161..725c580 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -26,8 +26,10 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.table.utils.MemoryTableSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -162,6 +164,28 @@ class TableEnvironmentITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ @Test
+ def testInsertIntoMemoryTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+
+ val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ tEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ env.execute()
+
+ val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ }
}
object TableEnvironmentITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 24d8695..ec65cf7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
tEnv.registerTable("MyTable", table)
- val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
+ val t = tEnv.sqlQuery("SELECT COUNT(`rowtime`) FROM MyTable " +
"GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
val results = t.toAppendStream[Row]
@@ -292,7 +292,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
tEnv.registerTable("T1", table)
val querySql = "select rowtime as ts, string as msg from T1"
- val results = tEnv.sql(querySql).toAppendStream[Pojo1]
+ val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1]
results.addSink(new StreamITCase.StringSink[Pojo1])
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index ab7925b..e40da7a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -60,7 +60,7 @@ class JoinITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
@@ -97,7 +97,7 @@ class JoinITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
index cc47a69..4884513 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -110,7 +110,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" MIN(c) OVER (" +
" ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -153,7 +153,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -181,7 +181,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
"as cnt1 from T1)"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -215,7 +215,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row](queryConfig)
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -240,7 +240,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -302,7 +302,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
" FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -363,7 +363,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
"FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -431,7 +431,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
" FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -492,7 +492,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
"FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -553,7 +553,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -619,7 +619,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -681,7 +681,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -742,7 +742,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -814,7 +814,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 2c59f8c..19db2a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -87,7 +87,7 @@ class SortITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 5398c6d..2c82d9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -18,16 +18,21 @@
package org.apache.flink.table.runtime.stream.sql
+import java.util
+
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+
+import scala.collection.JavaConverters._
import org.junit.Assert._
import org.junit._
@@ -58,7 +63,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = ds.toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTableRow", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -79,7 +84,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
@@ -100,7 +105,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -121,7 +126,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -142,7 +147,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env)
tEnv.registerDataStream("MyTable", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -166,7 +171,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -193,7 +198,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -219,7 +224,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.get3TupleDataStream(env)
tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -244,7 +249,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -276,7 +281,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -306,7 +311,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -365,5 +370,33 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
-}
+ @Test
+ def testInsertIntoMemoryTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ .assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f", "t")
+ val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]]
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
+ tEnv.sqlUpdate(sql)
+ env.execute()
+
+ val expected = List(
+ "1,1,Hi,1970-01-01 00:00:00.001",
+ "2,2,Hello,1970-01-01 00:00:00.002",
+ "3,2,Hello world,1970-01-01 00:00:00.002")
+ assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index be876a8..30ada56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -43,7 +43,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
tEnv.registerTableSource("persons", csvTable)
- tEnv.sql(
+ tEnv.sqlQuery(
"SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 830359f..c5b82fe 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.stream.table
import java.io.File
import java.lang.{Boolean => JBool}
-import java.sql.Timestamp
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -37,6 +36,7 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
import org.apache.flink.table.sinks._
+import org.apache.flink.table.utils.MemoryTableSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
@@ -49,6 +49,36 @@ import scala.collection.JavaConverters._
class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
+ def testInsertIntoRegisteredTableSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+
+ val input = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(r => r._2)
+ val fieldNames = Array("d", "e", "t")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+ .where('a < 3 || 'a > 19)
+ .select('c, 't, 'b)
+ .insertInto("targetTable")
+ env.execute()
+
+ val expected = Seq(
+ "Hi,1970-01-01 00:00:00.001,1",
+ "Hello,1970-01-01 00:00:00.002,2",
+ "Comment#14,1970-01-01 00:00:00.006,6",
+ "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+ TestBaseUtils.compareResultAsText(MemoryTableSinkUtil.results.asJava, expected)
+ }
+
+ @Test
def testStreamTableSink(): Unit = {
val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
new file mode 100644
index 0000000..29b2e94
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+
+object MemoryTableSinkUtil {
+ var results: mutable.MutableList[String] = mutable.MutableList.empty[String]
+
+ def clear = {
+ MemoryTableSinkUtil.results.clear()
+ }
+
+ final class UnsafeMemoryAppendTableSink
+ extends TableSinkBase[Row] with BatchTableSink[Row]
+ with AppendStreamTableSink[Row] {
+
+ override def getOutputType: TypeInformation[Row] = {
+ new RowTypeInfo(getFieldTypes, getFieldNames)
+ }
+
+ override protected def copy: TableSinkBase[Row] = {
+ new UnsafeMemoryAppendTableSink
+ }
+
+ override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+ dataSet.output(new MemoryCollectionOutputFormat)
+ }
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ dataStream.addSink(new MemoryAppendSink)
+ }
+ }
+
+ private class MemoryAppendSink extends RichSinkFunction[Row]() {
+
+ override def invoke(value: Row): Unit = {
+ results.synchronized {
+ results += value.toString
+ }
+ }
+ }
+
+ private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
+
+ override def configure(parameters: Configuration): Unit = {}
+
+ override def open(taskNumber: Int, numTasks: Int): Unit = {}
+
+ override def writeRecord(record: Row): Unit = {
+ results.synchronized {
+ results += record.toString
+ }
+ }
+
+ override def close(): Unit = {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index c4e2433..ff7c79d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.utils
import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
@@ -40,4 +41,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
+ override def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit = ???
}