You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/06 14:28:03 UTC
[flink] branch release-1.6 updated: [FLINK-10261] [table] Fix
INSERT INTO with ORDER BY clause
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new ddc2a98 [FLINK-10261] [table] Fix INSERT INTO with ORDER BY clause
ddc2a98 is described below
commit ddc2a987b07806ecaa748866353f34f1e3c5f0a6
Author: xueyu <27...@qq.com>
AuthorDate: Mon Sep 3 14:06:29 2018 +0800
[FLINK-10261] [table] Fix INSERT INTO with ORDER BY clause
This closes #6648.
---
.../apache/flink/table/api/TableEnvironment.scala | 4 +--
.../stream/sql/validation/SortValidationTest.scala | 1 -
.../table/runtime/stream/sql/SortITCase.scala | 36 ++++++++++++++++++++--
.../table/utils/MemoryTableSourceSinkUtil.scala | 2 ++
4 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 37f6d02..195812d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -711,10 +711,10 @@ abstract class TableEnvironment(val config: TableConfig) {
case insert: SqlInsert =>
// validate the SQL query
val query = insert.getSource
- planner.validate(query)
+ val validatedQuery = planner.validate(query)
// get query result as Table
- val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel))
+ val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
// get name of sink table
val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index 083ed94..6c477fd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -38,7 +38,6 @@ class SortValidationTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, "")
}
-
// test should fail because time is not the primary order field
@Test(expected = classOf[TableException])
def testSortProcessingTimeSecondaryField(): Unit = {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 19db2a0..e7b79a5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -18,15 +18,17 @@
package org.apache.flink.table.runtime.stream.sql
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.runtime.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
@@ -105,6 +107,36 @@ class SortITCase extends StreamingWithStateTestBase {
"20")
assertEquals(expected, SortITCase.testResults)
}
+
+ @Test
+ def testInsertIntoMemoryTableOrderBy(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSourceSinkUtil.clear()
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ .assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f", "t")
+ val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]]
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+ "FROM sourceTable ORDER BY rowtime, a desc"
+ tEnv.sqlUpdate(sql)
+ env.execute()
+
+ val expected = List(
+ "1,1,Hi,1970-01-01 00:00:00.001",
+ "3,2,Hello world,1970-01-01 00:00:00.002",
+ "2,2,Hello,1970-01-01 00:00:00.002")
+ assertEquals(expected, MemoryTableSourceSinkUtil.tableDataStrings)
+ }
}
object SortITCase {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
index cb0ad43..1edd79f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -119,8 +119,10 @@ object MemoryTableSourceSinkUtil {
}
override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ val inputParallelism = dataStream.getParallelism
dataStream
.addSink(new MemoryAppendSink)
+ .setParallelism(inputParallelism)
.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
}
}