You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/06 14:20:38 UTC

[flink] branch master updated: [FLINK-10261] [table] Fix INSERT INTO with ORDER BY clause

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d036417  [FLINK-10261] [table] Fix INSERT INTO with ORDER BY clause
d036417 is described below

commit d036417985d3e2b1ca63909007db9710e842abf4
Author: xueyu <27...@qq.com>
AuthorDate: Mon Sep 3 14:06:29 2018 +0800

    [FLINK-10261] [table] Fix INSERT INTO with ORDER BY clause
    
    This closes #6648.
---
 .../apache/flink/table/api/TableEnvironment.scala  |  4 +--
 .../stream/sql/validation/SortValidationTest.scala |  1 -
 .../table/runtime/stream/sql/SortITCase.scala      | 36 ++++++++++++++++++++--
 .../table/utils/MemoryTableSourceSinkUtil.scala    |  2 ++
 4 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 37f6d02..195812d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -711,10 +711,10 @@ abstract class TableEnvironment(val config: TableConfig) {
       case insert: SqlInsert =>
         // validate the SQL query
         val query = insert.getSource
-        planner.validate(query)
+        val validatedQuery = planner.validate(query)
 
         // get query result as Table
-        val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel))
+        val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
 
         // get name of sink table
         val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index 083ed94..6c477fd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -38,7 +38,6 @@ class SortValidationTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, "")
   }
 
-
   // test should fail because time is not the primary order field
   @Test(expected = classOf[TableException])
   def testSortProcessingTimeSecondaryField(): Unit = {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 19db2a0..e7b79a5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -18,15 +18,17 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
@@ -105,6 +107,36 @@ class SortITCase extends StreamingWithStateTestBase {
       "20")
     assertEquals(expected, SortITCase.testResults)
   }
+
+  @Test
+  def testInsertIntoMemoryTableOrderBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+      .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+      "FROM sourceTable ORDER BY rowtime, a desc"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "3,2,Hello world,1970-01-01 00:00:00.002",
+      "2,2,Hello,1970-01-01 00:00:00.002")
+    assertEquals(expected, MemoryTableSourceSinkUtil.tableDataStrings)
+  }
 }
 
 object SortITCase {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
index cb0ad43..1edd79f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -119,8 +119,10 @@ object MemoryTableSourceSinkUtil {
     }
 
     override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      val inputParallelism = dataStream.getParallelism
       dataStream
         .addSink(new MemoryAppendSink)
+        .setParallelism(inputParallelism)
         .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
     }
   }