You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by GitBox <> on 2018/09/18 22:40:05 UTC

[GitHub] asfgit closed pull request #6641: [FLINK-10259] [table] Fix key extraction for GroupWindows.

asfgit closed pull request #6641: [FLINK-10259] [table] Fix key extraction for GroupWindows.

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 4b7d0ed681c..c47898730b2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -142,7 +142,8 @@ object UpdatingPlanChecker {
           // we have only a unique key if at least one window property is selected
           if (windowProperties.nonEmpty) {
-            Some( => (e, e)) ++ => (e, e)))
+            val windowId = windowProperties.min
+            Some( => (e, e)) ++ => (e, windowId)))
           } else {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
new file mode 100644
index 00000000000..efba0265d3c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
@@ -0,0 +1,406 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import{RowCollector, TestRetractSink, TestUpsertSink}
+import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.JavaConverters._
+class InsertIntoITCase extends StreamingWithStateTestBase {
+  @Test
+  def testInsertIntoAppendStreamToTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+    val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(r => r._2)
+    tEnv.registerDataStream("sourceTable", input, 'a, 'b, 'c, 't.rowtime)
+    val fieldNames = Array("d", "e", "t")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT c, t, b
+         |FROM sourceTable
+         |WHERE a < 3 OR a > 19
+       """.stripMargin)
+    env.execute()
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001,1",
+      "Hello,1970-01-01 00:00:00.002,2",
+      "Comment#14,1970-01-01 00:00:00.006,6",
+      "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+    TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
+  }
+  @Test
+  def testInsertIntoUpdatingTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("len", "cntid", "sumnum"),
+      Array(Types.INT, Types.LONG, Types.LONG),
+      new TestRetractSink)
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT len, COUNT(id) AS cntid, SUM(num) AS sumnum
+         |FROM (SELECT id, num, CHAR_LENGTH(text) AS len FROM sourceTable)
+         |GROUP BY len
+       """.stripMargin)
+    env.execute()
+    val results = RowCollector.getAndClearValues
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "2,1,1",
+      "5,1,2",
+      "11,1,2",
+      "25,1,3",
+      "10,7,39",
+      "14,1,3",
+      "9,9,41").sorted
+    assertEquals(expected, retracted)
+  }
+  @Test
+  def testInsertIntoAppendTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid", "sumnum"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestRetractSink
+    )
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid,
+         |  SUM(num) AS sumnum
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+       """.stripMargin)
+    env.execute()
+    val results = RowCollector.getAndClearValues
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.01,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.02,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, retracted)
+  }
+  @Test
+  def testInsertIntoUpdatingTableWithFullKeyToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("cnt", "cntid", "cTrue"),
+      Array(Types.LONG, Types.LONG, Types.BOOLEAN),
+      new TestUpsertSink(Array("cnt", "cTrue"), false)
+    )
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT cnt, COUNT(len) AS cntid, cTrue
+         |FROM
+         |  (SELECT CHAR_LENGTH(text) AS len, (id > 0) AS cTrue, COUNT(id) AS cnt
+         |   FROM sourceTable
+         |   GROUP BY CHAR_LENGTH(text), (id > 0)
+         |   )
+         |GROUP BY cnt, cTrue
+       """.stripMargin)
+    env.execute()
+    val results = RowCollector.getAndClearValues
+    assertTrue(
+      "Results must include delete messages",
+      results.exists(_.f0 == false)
+    )
+    val retracted = RowCollector.upsertResults(results, Array(0, 2)).sorted
+    val expected = List(
+      "1,5,true",
+      "7,1,true",
+      "9,1,true").sorted
+    assertEquals(expected, retracted)
+  }
+  @Test
+  def testInsertIntoAppendingTableWithFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "wend", "cntid"),
+      Array(Types.LONG, Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(Array("wend", "num"), true)
+    )
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+    env.execute()
+    val results = RowCollector.getAndClearValues
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+    val retracted = RowCollector.upsertResults(results, Array(0, 1)).sorted
+    val expected = List(
+      "1,1970-01-01 00:00:00.005,1",
+      "2,1970-01-01 00:00:00.005,2",
+      "3,1970-01-01 00:00:00.005,1",
+      "3,1970-01-01 00:00:00.01,2",
+      "4,1970-01-01 00:00:00.01,3",
+      "4,1970-01-01 00:00:00.015,1",
+      "5,1970-01-01 00:00:00.015,4",
+      "5,1970-01-01 00:00:00.02,1",
+      "6,1970-01-01 00:00:00.02,4",
+      "6,1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+  @Test
+  def testInsertIntoAppendingTableWithFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wstart", "wend", "num", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestUpsertSink(Array("wstart", "wend", "num"), true)
+    )
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) AS wstart,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+    env.execute()
+    val results = RowCollector.getAndClearValues
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+    val retracted = RowCollector.upsertResults(results, Array(0, 1, 2)).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
+      "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
+    assertEquals(expected, retracted)
+  }
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+    env.execute()
+    val results = RowCollector.getAndClearValues
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+    val retracted =
+    val expected = List(
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.005,2",
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.01,2",
+      "1970-01-01 00:00:00.01,3",
+      "1970-01-01 00:00:00.015,1",
+      "1970-01-01 00:00:00.015,4",
+      "1970-01-01 00:00:00.02,1",
+      "1970-01-01 00:00:00.02,4",
+      "1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "cntid"),
+      Array(Types.LONG, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+    env.execute()
+    val results = RowCollector.getAndClearValues
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+    val retracted =
+    val expected = List(
+      "1,1",
+      "2,2",
+      "3,1",
+      "3,2",
+      "4,3",
+      "4,1",
+      "5,4",
+      "5,1",
+      "6,4",
+      "6,2").sorted
+    assertEquals(expected, retracted)
+  }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index a2d9bb26c4c..51bea2cdf20 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -713,35 +713,6 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  @Test
-  def testInsertIntoMemoryTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSourceSinkUtil.clear()
-    val t = StreamTestData.getSmall3TupleDataStream(env)
-        .assignAscendingTimestamps(x => x._2)
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-    tEnv.registerTable("sourceTable", t)
-    val fieldNames = Array("d", "e", "f", "t")
-    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
-      .asInstanceOf[Array[TypeInformation[_]]]
-    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
-    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
-    tEnv.sqlUpdate(sql)
-    env.execute()
-    val expected = List(
-      "1,1,Hi,1970-01-01 00:00:00.001",
-      "2,2,Hello,1970-01-01 00:00:00.002",
-      "3,2,Hello world,1970-01-01 00:00:00.002")
-    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
-  }
   def testWriteReadTableSourceSink(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services