You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/09/18 22:38:20 UTC
[flink] 01/02: [FLINK-10259] [table] Fix identification of key
attributes for GroupWindows.
This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f28b82909c3c6bcbe0436cae41af9a3c001f1c36
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Thu Aug 30 15:39:09 2018 +0200
[FLINK-10259] [table] Fix identification of key attributes for GroupWindows.
This closes #6641.
---
.../table/plan/util/UpdatingPlanChecker.scala | 3 +-
.../runtime/stream/sql/InsertIntoITCase.scala | 406 +++++++++++++++++++++
.../flink/table/runtime/stream/sql/SqlITCase.scala | 29 --
3 files changed, 408 insertions(+), 30 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 4b7d0ed..c478987 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -142,7 +142,8 @@ object UpdatingPlanChecker {
.map(_.name)
// we have only a unique key if at least one window property is selected
if (windowProperties.nonEmpty) {
- Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, e)))
+ val windowId = windowProperties.min
+ Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, windowId)))
} else {
None
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
new file mode 100644
index 0000000..efba026
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.runtime.stream.table.{RowCollector, TestRetractSink, TestUpsertSink}
+import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class InsertIntoITCase extends StreamingWithStateTestBase {
+
+ @Test
+ def testInsertIntoAppendStreamToTableSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSourceSinkUtil.clear()
+
+ val input = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(r => r._2)
+
+ tEnv.registerDataStream("sourceTable", input, 'a, 'b, 'c, 't.rowtime)
+
+ val fieldNames = Array("d", "e", "t")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT c, t, b
+ |FROM sourceTable
+ |WHERE a < 3 OR a > 19
+ """.stripMargin)
+
+ env.execute()
+
+ val expected = Seq(
+ "Hi,1970-01-01 00:00:00.001,1",
+ "Hello,1970-01-01 00:00:00.002,2",
+ "Comment#14,1970-01-01 00:00:00.006,6",
+ "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+ TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
+ }
+
+ @Test
+ def testInsertIntoUpdatingTableToRetractSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+
+ tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+ tEnv.registerTableSink(
+ "targetTable",
+ Array("len", "cntid", "sumnum"),
+ Array(Types.INT, Types.LONG, Types.LONG),
+ new TestRetractSink)
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT len, COUNT(id) AS cntid, SUM(num) AS sumnum
+ |FROM (SELECT id, num, CHAR_LENGTH(text) AS len FROM sourceTable)
+ |GROUP BY len
+ """.stripMargin)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ val retracted = RowCollector.retractResults(results).sorted
+ val expected = List(
+ "2,1,1",
+ "5,1,2",
+ "11,1,2",
+ "25,1,3",
+ "10,7,39",
+ "14,1,3",
+ "9,9,41").sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testInsertIntoAppendTableToRetractSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+
+ tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "targetTable",
+ Array("wend", "cntid", "sumnum"),
+ Array(Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+ new TestRetractSink
+ )
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT
+ | TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+ | COUNT(id) AS cntid,
+ | SUM(num) AS sumnum
+ |FROM sourceTable
+ |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+ """.stripMargin)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = RowCollector.retractResults(results).sorted
+ val expected = List(
+ "1970-01-01 00:00:00.005,4,8",
+ "1970-01-01 00:00:00.01,5,18",
+ "1970-01-01 00:00:00.015,5,24",
+ "1970-01-01 00:00:00.02,5,29",
+ "1970-01-01 00:00:00.025,2,12")
+ .sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testInsertIntoUpdatingTableWithFullKeyToUpsertSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+
+ tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+ tEnv.registerTableSink(
+ "targetTable",
+ Array("cnt", "cntid", "cTrue"),
+ Array(Types.LONG, Types.LONG, Types.BOOLEAN),
+ new TestUpsertSink(Array("cnt", "cTrue"), false)
+ )
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT cnt, COUNT(len) AS cntid, cTrue
+ |FROM
+ | (SELECT CHAR_LENGTH(text) AS len, (id > 0) AS cTrue, COUNT(id) AS cnt
+ | FROM sourceTable
+ | GROUP BY CHAR_LENGTH(text), (id > 0)
+ | )
+ |GROUP BY cnt, cTrue
+ """.stripMargin)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertTrue(
+ "Results must include delete messages",
+ results.exists(_.f0 == false)
+ )
+
+ val retracted = RowCollector.upsertResults(results, Array(0, 2)).sorted
+ val expected = List(
+ "1,5,true",
+ "7,1,true",
+ "9,1,true").sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testInsertIntoAppendingTableWithFullKey1ToUpsertSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+
+ tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "targetTable",
+ Array("num", "wend", "cntid"),
+ Array(Types.LONG, Types.SQL_TIMESTAMP, Types.LONG),
+ new TestUpsertSink(Array("wend", "num"), true)
+ )
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT
+ | num,
+ | TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+ | COUNT(id) AS cntid
+ |FROM sourceTable
+ |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+ """.stripMargin)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = RowCollector.upsertResults(results, Array(0, 1)).sorted
+ val expected = List(
+ "1,1970-01-01 00:00:00.005,1",
+ "2,1970-01-01 00:00:00.005,2",
+ "3,1970-01-01 00:00:00.005,1",
+ "3,1970-01-01 00:00:00.01,2",
+ "4,1970-01-01 00:00:00.01,3",
+ "4,1970-01-01 00:00:00.015,1",
+ "5,1970-01-01 00:00:00.015,4",
+ "5,1970-01-01 00:00:00.02,1",
+ "6,1970-01-01 00:00:00.02,4",
+ "6,1970-01-01 00:00:00.025,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testInsertIntoAppendingTableWithFullKey2ToUpsertSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+
+ tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "targetTable",
+ Array("wstart", "wend", "num", "cntid"),
+ Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+ new TestUpsertSink(Array("wstart", "wend", "num"), true)
+ )
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT
+ | TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) AS wstart,
+ | TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+ | num,
+ | COUNT(id) AS cntid
+ |FROM sourceTable
+ |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+ """.stripMargin)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = RowCollector.upsertResults(results, Array(0, 1, 2)).sorted
+ val expected = List(
+ "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
+ "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
+ "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
+ "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
+ "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
+ "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
+ "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
+ "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
+ "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
+ "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+
+ tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "targetTable",
+ Array("wend", "cntid"),
+ Array(Types.SQL_TIMESTAMP, Types.LONG),
+ new TestUpsertSink(null, true)
+ )
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT
+ | TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+ | COUNT(id) AS cntid
+ |FROM sourceTable
+ |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+ """.stripMargin)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = results.map(_.f1.toString).sorted
+ val expected = List(
+ "1970-01-01 00:00:00.005,1",
+ "1970-01-01 00:00:00.005,2",
+ "1970-01-01 00:00:00.005,1",
+ "1970-01-01 00:00:00.01,2",
+ "1970-01-01 00:00:00.01,3",
+ "1970-01-01 00:00:00.015,1",
+ "1970-01-01 00:00:00.015,4",
+ "1970-01-01 00:00:00.02,1",
+ "1970-01-01 00:00:00.02,4",
+ "1970-01-01 00:00:00.025,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+
+ tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "targetTable",
+ Array("num", "cntid"),
+ Array(Types.LONG, Types.LONG),
+ new TestUpsertSink(null, true)
+ )
+
+ tEnv.sqlUpdate(
+ s"""INSERT INTO targetTable
+ |SELECT
+ | num,
+ | COUNT(id) AS cntid
+ |FROM sourceTable
+ |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+ """.stripMargin)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = results.map(_.f1.toString).sorted
+ val expected = List(
+ "1,1",
+ "2,2",
+ "3,1",
+ "3,2",
+ "4,3",
+ "4,1",
+ "5,4",
+ "5,1",
+ "6,4",
+ "6,2").sorted
+ assertEquals(expected, retracted)
+ }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index f187055..de0b392 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -714,35 +714,6 @@ class SqlITCase extends StreamingWithStateTestBase {
}
@Test
- def testInsertIntoMemoryTable(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSourceSinkUtil.clear()
-
- val t = StreamTestData.getSmall3TupleDataStream(env)
- .assignAscendingTimestamps(x => x._2)
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
- tEnv.registerTable("sourceTable", t)
-
- val fieldNames = Array("d", "e", "f", "t")
- val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
- .asInstanceOf[Array[TypeInformation[_]]]
- val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
-
- val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
- tEnv.sqlUpdate(sql)
- env.execute()
-
- val expected = List(
- "1,1,Hi,1970-01-01 00:00:00.001",
- "2,2,Hello,1970-01-01 00:00:00.002",
- "3,2,Hello world,1970-01-01 00:00:00.002")
- assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
- }
-
- @Test
def testWriteReadTableSourceSink(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)