You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/09/18 22:38:20 UTC

[flink] 01/02: [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f28b82909c3c6bcbe0436cae41af9a3c001f1c36
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Thu Aug 30 15:39:09 2018 +0200

    [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.
    
    This closes #6641.
---
 .../table/plan/util/UpdatingPlanChecker.scala      |   3 +-
 .../runtime/stream/sql/InsertIntoITCase.scala      | 406 +++++++++++++++++++++
 .../flink/table/runtime/stream/sql/SqlITCase.scala |  29 --
 3 files changed, 408 insertions(+), 30 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 4b7d0ed..c478987 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -142,7 +142,8 @@ object UpdatingPlanChecker {
             .map(_.name)
           // we have only a unique key if at least one window property is selected
           if (windowProperties.nonEmpty) {
-            Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, e)))
+            val windowId = windowProperties.min
+            Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, windowId)))
           } else {
             None
           }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
new file mode 100644
index 0000000..efba026
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.runtime.stream.table.{RowCollector, TestRetractSink, TestUpsertSink}
+import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class InsertIntoITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testInsertIntoAppendStreamToTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(r => r._2)
+
+    tEnv.registerDataStream("sourceTable", input, 'a, 'b, 'c, 't.rowtime)
+
+    val fieldNames = Array("d", "e", "t")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT c, t, b
+         |FROM sourceTable
+         |WHERE a < 3 OR a > 19
+       """.stripMargin)
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001,1",
+      "Hello,1970-01-01 00:00:00.002,2",
+      "Comment#14,1970-01-01 00:00:00.006,6",
+      "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
+  }
+
+  @Test
+  def testInsertIntoUpdatingTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("len", "cntid", "sumnum"),
+      Array(Types.INT, Types.LONG, Types.LONG),
+      new TestRetractSink)
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT len, COUNT(id) AS cntid, SUM(num) AS sumnum
+         |FROM (SELECT id, num, CHAR_LENGTH(text) AS len FROM sourceTable)
+         |GROUP BY len
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "2,1,1",
+      "5,1,2",
+      "11,1,2",
+      "25,1,3",
+      "10,7,39",
+      "14,1,3",
+      "9,9,41").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoAppendTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid", "sumnum"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestRetractSink
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid,
+         |  SUM(num) AS sumnum
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.01,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.02,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoUpdatingTableWithFullKeyToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("cnt", "cntid", "cTrue"),
+      Array(Types.LONG, Types.LONG, Types.BOOLEAN),
+      new TestUpsertSink(Array("cnt", "cTrue"), false)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT cnt, COUNT(len) AS cntid, cTrue
+         |FROM
+         |  (SELECT CHAR_LENGTH(text) AS len, (id > 0) AS cTrue, COUNT(id) AS cnt
+         |   FROM sourceTable
+         |   GROUP BY CHAR_LENGTH(text), (id > 0)
+         |   )
+         |GROUP BY cnt, cTrue
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertTrue(
+      "Results must include delete messages",
+      results.exists(_.f0 == false)
+    )
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 2)).sorted
+    val expected = List(
+      "1,5,true",
+      "7,1,true",
+      "9,1,true").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "wend", "cntid"),
+      Array(Types.LONG, Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(Array("wend", "num"), true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 1)).sorted
+    val expected = List(
+      "1,1970-01-01 00:00:00.005,1",
+      "2,1970-01-01 00:00:00.005,2",
+      "3,1970-01-01 00:00:00.005,1",
+      "3,1970-01-01 00:00:00.01,2",
+      "4,1970-01-01 00:00:00.01,3",
+      "4,1970-01-01 00:00:00.015,1",
+      "5,1970-01-01 00:00:00.015,4",
+      "5,1970-01-01 00:00:00.02,1",
+      "6,1970-01-01 00:00:00.02,4",
+      "6,1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wstart", "wend", "num", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestUpsertSink(Array("wstart", "wend", "num"), true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) AS wstart,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 1, 2)).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
+      "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.005,2",
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.01,2",
+      "1970-01-01 00:00:00.01,3",
+      "1970-01-01 00:00:00.015,1",
+      "1970-01-01 00:00:00.015,4",
+      "1970-01-01 00:00:00.02,1",
+      "1970-01-01 00:00:00.02,4",
+      "1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "cntid"),
+      Array(Types.LONG, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1,1",
+      "2,2",
+      "3,1",
+      "3,2",
+      "4,3",
+      "4,1",
+      "5,4",
+      "5,1",
+      "6,4",
+      "6,2").sorted
+    assertEquals(expected, retracted)
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index f187055..de0b392 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -714,35 +714,6 @@ class SqlITCase extends StreamingWithStateTestBase {
   }
 
   @Test
-  def testInsertIntoMemoryTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSourceSinkUtil.clear()
-
-    val t = StreamTestData.getSmall3TupleDataStream(env)
-        .assignAscendingTimestamps(x => x._2)
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-    tEnv.registerTable("sourceTable", t)
-
-    val fieldNames = Array("d", "e", "f", "t")
-    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
-      .asInstanceOf[Array[TypeInformation[_]]]
-    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
-
-    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
-    tEnv.sqlUpdate(sql)
-    env.execute()
-
-    val expected = List(
-      "1,1,Hi,1970-01-01 00:00:00.001",
-      "2,2,Hello,1970-01-01 00:00:00.002",
-      "3,2,Hello world,1970-01-01 00:00:00.002")
-    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
-  }
-
-  @Test
   def testWriteReadTableSourceSink(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)