You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/26 05:54:04 UTC
[flink] branch release-1.9 updated:
[FLINK-13289][table-planner-blink] Blink planner should setKeyFields to
upsert table sink
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new cdcd15f [FLINK-13289][table-planner-blink] Blink planner should setKeyFields to upsert table sink
cdcd15f is described below
commit cdcd15ff5fc00cb33b361bf1c408d1801ad6032a
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Jul 22 16:04:34 2019 +0800
[FLINK-13289][table-planner-blink] Blink planner should setKeyFields to upsert table sink
This closes #9195
---
.../nodes/physical/stream/StreamExecSink.scala | 20 +
.../planner/plan/utils/UpdatingPlanChecker.scala | 202 +-------
.../runtime/stream/sql/Limit0RemoveITCase.scala | 8 +-
.../planner/runtime/stream/sql/RankITCase.scala | 4 +-
.../planner/runtime/stream/table/JoinITCase.scala | 4 +-
.../runtime/stream/table/TableSinkITCase.scala | 524 +++++++++++++++++++++
.../planner/runtime/utils/StreamTestSink.scala | 21 +-
7 files changed, 583 insertions(+), 200 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index e31264a..22a8ede 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -97,6 +97,26 @@ class StreamExecSink[T](
// check for append only table
val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
upsertSink.setIsAppendOnly(isAppendOnlyTable)
+
+ // extract unique key fields
+ // Now we pick shortest one to sink
+ // TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]]
+ val tableKeys = {
+ UpdatingPlanChecker.getUniqueKeyFields(getInput, planner) match {
+ case Some(keys) => keys.sortBy(_.length).headOption
+ case None => None
+ }
+ }
+
+ // check that we have keys if the table has changes (is not append-only)
+ tableKeys match {
+ case Some(keys) => upsertSink.setKeyFields(keys)
+ case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
+ case None if !isAppendOnlyTable => throw new TableException(
+ "UpsertStreamTableSink requires that Table has" +
+ " a full primary keys if it is updated.")
+ }
+
translateToTransformation(withChangeFlag = true, planner)
case _: AppendStreamTableSink[T] =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
index e412e12..b0aaf9e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
@@ -17,16 +17,15 @@
*/
package org.apache.flink.table.planner.plan.utils
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
import scala.collection.JavaConversions._
-import scala.collection.mutable
object UpdatingPlanChecker {
@@ -39,14 +38,15 @@ object UpdatingPlanChecker {
}
/** Extracts the unique keys of the table produced by the plan. */
- def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
- getUniqueKeyGroups(plan).map(_.map(_._1).toArray)
- }
-
- /** Extracts the unique keys and groups of the table produced by the plan. */
- def getUniqueKeyGroups(plan: RelNode): Option[Seq[(String, String)]] = {
- val keyExtractor = new UniqueKeyExtractor
- keyExtractor.visit(plan)
+ def getUniqueKeyFields(relNode: RelNode, planner: StreamPlanner): Option[Array[Array[String]]] = {
+ val rowType = relNode.getRowType
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(planner.getRelBuilder.getCluster.getMetadataQuery)
+ val uniqueKeys = fmq.getUniqueKeys(relNode)
+ if (uniqueKeys != null && uniqueKeys.size() > 0) {
+ Some(uniqueKeys.filter(_.nonEmpty).map(_.toArray.map(rowType.getFieldNames.get)).toArray)
+ } else {
+ None
+ }
}
private class AppendOnlyValidator extends RelVisitor {
@@ -55,7 +55,7 @@ object UpdatingPlanChecker {
override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
node match {
- case s: StreamPhysicalRel if s.producesUpdates =>
+ case s: StreamPhysicalRel if s.producesUpdates || s.producesRetractions =>
isAppendOnly = false
case hep: HepRelVertex =>
visit(hep.getCurrentRel, ordinal, parent) //remove wrapper node
@@ -66,182 +66,4 @@ object UpdatingPlanChecker {
}
}
}
-
- /** Identifies unique key fields in the output of a RelNode. */
- private class UniqueKeyExtractor {
-
- // visit() function will return a tuple, the first element is the name of a key field, the
- // second is a group name that is shared by all equivalent key fields. The group names are
- // used to identify same keys, for example: select('pk as pk1, 'pk as pk2), both pk1 and pk2
- // belong to the same group, i.e., pk1. Here we use the lexicographic smallest attribute as
- // the common group id. A node can have keys if it generates the keys by itself or it
- // forwards keys from its input(s).
- def visit(node: RelNode): Option[Seq[(String, String)]] = {
- node match {
- case c: StreamExecCalc =>
- val inputKeys = visit(node.getInput(0))
- // check if input has keys
- if (inputKeys.isDefined) {
- // track keys forward
- val inNames = c.getInput.getRowType.getFieldNames
- val inOutNames = c.getProgram.getNamedProjects.map(p => {
- c.getProgram.expandLocalRef(p.left) match {
- // output field is forwarded input field
- case i: RexInputRef => (i.getIndex, p.right)
- // output field is renamed input field
- case a: RexCall if a.getKind.equals(SqlKind.AS) =>
- a.getOperands.get(0) match {
- case ref: RexInputRef =>
- (ref.getIndex, p.right)
- case _ =>
- (-1, p.right)
- }
- // output field is not forwarded from input
- case _: RexNode => (-1, p.right)
- }
- })
- // filter all non-forwarded fields
- .filter(_._1 >= 0)
- // resolve names of input fields
- .map(io => (inNames.get(io._1), io._2))
-
- // filter by input keys
- val inputKeysAndOutput = inOutNames
- .filter(io => inputKeys.get.map(e => e._1).contains(io._1))
-
- val inputKeysMap = inputKeys.get.toMap
- val inOutGroups = inputKeysAndOutput.sorted.reverse
- .map(e => (inputKeysMap(e._1), e._2))
- .toMap
-
- // get output keys
- val outputKeys = inputKeysAndOutput
- .map(io => (io._2, inOutGroups(inputKeysMap(io._1))))
-
- // check if all keys have been preserved
- if (outputKeys.map(_._2).distinct.length == inputKeys.get.map(_._2).distinct.length) {
- // all key have been preserved (but possibly renamed)
- Some(outputKeys)
- } else {
- // some (or all) keys have been removed. Keys are no longer unique and removed
- None
- }
- } else {
- None
- }
-
- case _: StreamExecOverAggregate =>
- // keys are always forwarded by Over aggregate
- visit(node.getInput(0))
- case a: StreamExecGroupAggregate =>
- // get grouping keys
- val groupKeys = a.getRowType.getFieldNames.take(a.grouping.length)
- Some(groupKeys.map(e => (e, e)))
-
- // TODO supports StreamExecGroupWindowAggregate
-
- case j: StreamExecJoin =>
- // get key(s) for join
- val lInKeys = visit(j.getLeft)
- val rInKeys = visit(j.getRight)
- if (lInKeys.isEmpty || rInKeys.isEmpty) {
- None
- } else {
- // Output of join must have keys if left and right both contain key(s).
- // Key groups from both side will be merged by join equi-predicates
- val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames
- val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames
- val joinNames = j.getRowType.getFieldNames
-
- // if right field names equal to left field names, calcite will rename right
- // field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b)
- // to T2(pk0, b).
- val rInNamesToJoinNamesMap = rInNames
- .zip(joinNames.subList(lInNames.size, joinNames.length))
- .toMap
-
- val lJoinKeys: Seq[String] = j.getJoinInfo.leftKeys
- .map(lInNames.get(_))
- val rJoinKeys: Seq[String] = j.getJoinInfo.rightKeys
- .map(rInNames.get(_))
- .map(rInNamesToJoinNamesMap(_))
-
- val inKeys: Seq[(String, String)] = lInKeys.get ++ rInKeys.get
- .map(e => (rInNamesToJoinNamesMap(e._1), rInNamesToJoinNamesMap(e._2)))
-
- getOutputKeysForNonWindowJoin(
- joinNames,
- inKeys,
- lJoinKeys.zip(rJoinKeys)
- )
- }
- case _: StreamPhysicalRel =>
- // anything else does not forward keys, so we can stop
- None
- }
- }
-
- /**
- * Get output keys for non-window join according to it's inputs.
- *
- * @param inNames Field names of join
- * @param inKeys Input keys of join
- * @param joinKeys JoinKeys of join
- * @return Return output keys of join
- */
- def getOutputKeysForNonWindowJoin(
- inNames: Seq[String],
- inKeys: Seq[(String, String)],
- joinKeys: Seq[(String, String)])
- : Option[Seq[(String, String)]] = {
-
- val nameToGroups = mutable.HashMap.empty[String, String]
-
- // merge two groups
- def merge(nameA: String, nameB: String): Unit = {
- val ga: String = findGroup(nameA)
- val gb: String = findGroup(nameB)
- if (!ga.equals(gb)) {
- if (ga.compare(gb) < 0) {
- nameToGroups += (gb -> ga)
- } else {
- nameToGroups += (ga -> gb)
- }
- }
- }
-
- def findGroup(x: String): String = {
- // find the group of x
- var r: String = x
- while (!nameToGroups(r).equals(r)) {
- r = nameToGroups(r)
- }
-
- // point all name to the group name directly
- var a: String = x
- var b: String = null
- while (!nameToGroups(a).equals(r)) {
- b = nameToGroups(a)
- nameToGroups += (a -> r)
- a = b
- }
- r
- }
-
- // init groups
- inNames.foreach(e => nameToGroups += (e -> e))
- inKeys.foreach(e => nameToGroups += (e._1 -> e._2))
- // merge groups
- joinKeys.foreach(e => merge(e._1, e._2))
- // make sure all name point to the group name directly
- inNames.foreach(findGroup)
-
- val outputGroups = inKeys.map(e => nameToGroups(e._1)).distinct
- Some(
- inNames
- .filter(e => outputGroups.contains(nameToGroups(e)))
- .map(e => (e, nameToGroups(e)))
- )
- }
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala
index 3b1763e..824907d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendTableSink, TestingUpsertTableSink}
+import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendTableSink, TestingRetractTableSink}
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -134,7 +134,7 @@ class Limit0RemoveITCase extends StreamingTestBase() {
val sql = "SELECT * FROM MyTable1 WHERE EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
val result = tEnv.sqlQuery(sql)
- val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+ val sink = TestSinkUtil.configureSink(result, new TestingRetractTableSink())
tEnv.registerTableSink("MySink", sink)
tEnv.insertInto(result, "MySink")
tEnv.execute("test")
@@ -155,13 +155,13 @@ class Limit0RemoveITCase extends StreamingTestBase() {
val sql = "SELECT * FROM MyTable1 WHERE NOT EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
val result = tEnv.sqlQuery(sql)
- val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+ val sink = TestSinkUtil.configureSink(result, new TestingRetractTableSink())
tEnv.registerTableSink("MySink", sink)
tEnv.insertInto(result, "MySink")
tEnv.execute("test")
val expected = Seq("1", "2", "3", "4", "5", "6")
- assertEquals(expected, sink.getUpsertResults.sorted)
+ assertEquals(expected, sink.getRetractResults.sorted)
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
index b8677f92..ceeeed3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
@@ -1069,7 +1069,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
val table = tEnv.sqlQuery(sql)
val schema = table.getSchema
- val sink = new TestingUpsertTableSink(Array(0, 2)).
+ val sink = new TestingRetractTableSink().
configure(schema.getFieldNames,
schema.getFieldDataTypes.map(_.nullable()).map(fromDataTypeToTypeInfo))
tEnv.registerTableSink("MySink", sink)
@@ -1092,7 +1092,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
"book,20,5",
"fruit,40,1",
"fruit,44,3")
- assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted)
+ assertEquals(updatedExpected.sorted, sink.getRetractResults.sorted)
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index 54b7f9e..306b559 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -871,14 +871,14 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
.where('leftPk === 'rightPk)
.select('leftPk, 'leftA, 'rightPk, 'rightA)
val schema = t.getSchema
- val sink = new TestingUpsertTableSink(Array(0, 1, 2)).configure(
+ val sink = new TestingRetractTableSink().configure(
schema.getFieldNames, schema.getFieldTypes)
tEnv.registerTableSink("MySink", sink)
tEnv.insertInto(t, "MySink")
tEnv.execute("test")
val expected = Seq("1,4,1,2", "1,5,1,2")
- assertEquals(expected.sorted, sink.getUpsertResults.sorted)
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
new file mode 100644
index 0000000..aa6b17d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, Tumble, Types}
+import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5}
+import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.sinks._
+import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit.Test
+
+import java.io.File
+import java.util.TimeZone
+
+import scala.collection.JavaConverters._
+
+class TableSinkITCase extends AbstractTestBase {
+
+ @Test
+ def testInsertIntoRegisteredTableSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val tEnv = StreamTableEnvironment.create(env)
+ MemoryTableSourceSinkUtil.clear()
+
+ val input = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(r => r._2)
+ val fieldNames = Array("d", "e", "t")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+ input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+ .where('a < 3 || 'a > 19)
+ .select('c, 't, 'b)
+ .insertInto("targetTable")
+ env.execute()
+
+ val expected = Seq(
+ "Hi,1970-01-01 00:00:00.001,1",
+ "Hello,1970-01-01 00:00:00.002,2",
+ "Comment#14,1970-01-01 00:00:00.006,6",
+ "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+ TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
+ }
+
+ @Test
+ def testStreamTableSink(): Unit = {
+
+ val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+ tmpFile.deleteOnExit()
+ val path = tmpFile.toURI.toString
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val tEnv = StreamTableEnvironment.create(env)
+ env.setParallelism(4)
+
+ tEnv.registerTableSink(
+ "csvSink",
+ new CsvTableSink(path).configure(
+ Array[String]("c", "b"),
+ Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP)))
+
+ val input = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._2)
+ .map(x => x).setParallelism(4) // increase DOP to 4
+
+ input.toTable(tEnv, 'a, 'b.rowtime, 'c)
+ .where('a < 5 || 'a > 17)
+ .select('c, 'b)
+ .insertInto("csvSink")
+
+ env.execute()
+
+ val expected = Seq(
+ "Hi,1970-01-01 00:00:00.001",
+ "Hello,1970-01-01 00:00:00.002",
+ "Hello world,1970-01-01 00:00:00.002",
+ "Hello world, how are you?,1970-01-01 00:00:00.003",
+ "Comment#12,1970-01-01 00:00:00.006",
+ "Comment#13,1970-01-01 00:00:00.006",
+ "Comment#14,1970-01-01 00:00:00.006",
+ "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
+
+ TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+ }
+
+ @Test
+ def testAppendSinkOnAppendTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val sink = new TestingAppendTableSink(TimeZone.getDefault)
+ tEnv.registerTableSink(
+ "appendSink",
+ sink.configure(
+ Array[String]("t", "icnt", "nsum"),
+ Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG, Types.LONG)))
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
+ .insertInto("appendSink")
+
+ env.execute()
+
+ val result = sink.getAppendResults.sorted
+ val expected = List(
+ "1970-01-01 00:00:00.005,4,8",
+ "1970-01-01 00:00:00.010,5,18",
+ "1970-01-01 00:00:00.015,5,24",
+ "1970-01-01 00:00:00.020,5,29",
+ "1970-01-01 00:00:00.025,2,12")
+ .sorted
+ assertEquals(expected, result)
+ }
+
+ @Test
+ def testAppendSinkOnAppendTableForInnerJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = env.fromCollection(tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ val sink = new TestingAppendTableSink
+ tEnv.registerTableSink(
+ "appendSink",
+ sink.configure(
+ Array[String]("c", "g"),
+ Array[TypeInformation[_]](Types.STRING, Types.STRING)))
+
+ ds1.join(ds2).where('b === 'e)
+ .select('c, 'g)
+ .insertInto("appendSink")
+
+ env.execute()
+
+ val result = sink.getAppendResults.sorted
+ val expected = List("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt").sorted
+ assertEquals(expected, result)
+ }
+
+ @Test
+ def testRetractSinkOnUpdatingTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text)
+
+ val sink = new TestingRetractTableSink()
+ tEnv.registerTableSink(
+ "retractSink",
+ sink.configure(
+ Array[String]("len", "icnt", "nsum"),
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
+ t.select('id, 'num, 'text.charLength() as 'len)
+ .groupBy('len)
+ .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
+ .insertInto("retractSink")
+
+ env.execute()
+
+ val retracted = sink.getRetractResults.sorted
+ val expected = List(
+ "2,1,1",
+ "5,1,2",
+ "11,1,2",
+ "25,1,3",
+ "10,7,39",
+ "14,1,3",
+ "9,9,41").sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testRetractSinkOnAppendTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val sink = new TestingRetractTableSink(TimeZone.getDefault)
+ tEnv.registerTableSink(
+ "retractSink",
+ sink.configure(
+ Array[String]("t", "icnt", "nsum"),
+ Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG, Types.LONG)))
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
+ .insertInto("retractSink")
+
+ env.execute()
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ sink.getRawResults.exists(_.startsWith("(false,")))
+
+ val retracted = sink.getRetractResults.sorted
+ val expected = List(
+ "1970-01-01 00:00:00.005,4,8",
+ "1970-01-01 00:00:00.010,5,18",
+ "1970-01-01 00:00:00.015,5,24",
+ "1970-01-01 00:00:00.020,5,29",
+ "1970-01-01 00:00:00.025,2,12")
+ .sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text)
+
+ val sink = new TestingUpsertTableSink(Array(0, 2), TimeZone.getDefault)
+ sink.expectedKeys = Some(Array("cnt", "cTrue"))
+ sink.expectedIsAppendOnly = Some(false)
+ tEnv.registerTableSink(
+ "upsertSink",
+ sink.configure(
+ Array[String]("cnt", "lencnt", "cTrue"),
+ Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.BOOLEAN)))
+
+ t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+ .groupBy('len, 'cTrue)
+ .select('len, 'id.count as 'cnt, 'cTrue)
+ .groupBy('cnt, 'cTrue)
+ .select('cnt, 'len.count as 'lencnt, 'cTrue)
+ .insertInto("upsertSink")
+
+ env.execute()
+
+ assertTrue(
+ "Results must include delete messages",
+ sink.getRawResults.exists(_.startsWith("(false,")))
+
+ val retracted = sink.getUpsertResults.sorted
+ val expected = List(
+ "1,5,true",
+ "7,1,true",
+ "9,1,true").sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val sink = new TestingUpsertTableSink(Array(0, 1, 2), TimeZone.getDefault)
+ sink.expectedKeys = Some(Array("wend", "num"))
+ sink.expectedIsAppendOnly = Some(true)
+ tEnv.registerTableSink(
+ "upsertSink",
+ sink.configure(
+ Array[String]("num", "wend", "icnt"),
+ Array[TypeInformation[_]](Types.LONG, Types.SQL_TIMESTAMP, Types.LONG)))
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('num, 'w.end as 'wend, 'id.count as 'icnt)
+ .insertInto("upsertSink")
+
+ env.execute()
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ sink.getRawResults.exists(_.startsWith("(false,")))
+
+ val retracted = sink.getUpsertResults.sorted
+ val expected = List(
+ "1,1970-01-01 00:00:00.005,1",
+ "2,1970-01-01 00:00:00.005,2",
+ "3,1970-01-01 00:00:00.005,1",
+ "3,1970-01-01 00:00:00.010,2",
+ "4,1970-01-01 00:00:00.010,3",
+ "4,1970-01-01 00:00:00.015,1",
+ "5,1970-01-01 00:00:00.015,4",
+ "5,1970-01-01 00:00:00.020,1",
+ "6,1970-01-01 00:00:00.020,4",
+ "6,1970-01-01 00:00:00.025,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val sink = new TestingUpsertTableSink(Array(0, 1, 2), TimeZone.getDefault)
+ sink.expectedKeys = Some(Array("wend", "num"))
+ sink.expectedIsAppendOnly = Some(true)
+ tEnv.registerTableSink(
+ "upsertSink",
+ sink.configure(
+ Array[String]("wstart", "wend", "num", "icnt"),
+ Array[TypeInformation[_]]
+ (Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG)))
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count as 'icnt)
+ .insertInto("upsertSink")
+
+ env.execute()
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ sink.getRawResults.exists(_.startsWith("(false,")))
+
+ val retracted = sink.getUpsertResults.sorted
+ val expected = List(
+ "1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,1,1",
+ "1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,2,2",
+ "1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,3,1",
+ "1970-01-01 00:00:00.005,1970-01-01 00:00:00.010,3,2",
+ "1970-01-01 00:00:00.005,1970-01-01 00:00:00.010,4,3",
+ "1970-01-01 00:00:00.010,1970-01-01 00:00:00.015,4,1",
+ "1970-01-01 00:00:00.010,1970-01-01 00:00:00.015,5,4",
+ "1970-01-01 00:00:00.015,1970-01-01 00:00:00.020,5,1",
+ "1970-01-01 00:00:00.015,1970-01-01 00:00:00.020,6,4",
+ "1970-01-01 00:00:00.020,1970-01-01 00:00:00.025,6,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val sink = new TestingUpsertTableSink(Array(0), TimeZone.getDefault)
+ sink.expectedIsAppendOnly = Some(true)
+ tEnv.registerTableSink(
+ "upsertSink",
+ sink.configure(
+ Array[String]("wend", "cnt"),
+ Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG)))
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('w.end as 'wend, 'id.count as 'cnt)
+ .insertInto("upsertSink")
+
+ env.execute()
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ sink.getRawResults.exists(_.startsWith("(false,")))
+
+ val retracted = sink.getRawResults.sorted
+ val expected = List(
+ "(true,1970-01-01 00:00:00.005,1)",
+ "(true,1970-01-01 00:00:00.005,2)",
+ "(true,1970-01-01 00:00:00.005,1)",
+ "(true,1970-01-01 00:00:00.010,2)",
+ "(true,1970-01-01 00:00:00.010,3)",
+ "(true,1970-01-01 00:00:00.015,1)",
+ "(true,1970-01-01 00:00:00.015,4)",
+ "(true,1970-01-01 00:00:00.020,1)",
+ "(true,1970-01-01 00:00:00.020,4)",
+ "(true,1970-01-01 00:00:00.025,2)").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val sink = new TestingUpsertTableSink(Array(0), TimeZone.getDefault)
+ sink.expectedIsAppendOnly = Some(true)
+ tEnv.registerTableSink(
+ "upsertSink",
+ sink.configure(
+ Array[String]("num", "cnt"),
+ Array[TypeInformation[_]](Types.LONG, Types.LONG)))
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('num, 'id.count as 'cnt)
+ .insertInto("upsertSink")
+
+ env.execute()
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ sink.getRawResults.exists(_.startsWith("(false,")))
+
+ val retracted = sink.getRawResults.sorted
+ val expected = List(
+ "(true,1,1)",
+ "(true,2,2)",
+ "(true,3,1)",
+ "(true,3,2)",
+ "(true,4,3)",
+ "(true,4,1)",
+ "(true,5,4)",
+ "(true,5,1)",
+ "(true,6,4)",
+ "(true,6,2)").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToAppendStreamMultiRowtime(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val r = t
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .groupBy('num, 'w)
+ .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2)
+
+ r.toAppendStream[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testToRetractStreamMultiRowtime(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = StreamTableEnvironment.create(env)
+
+ val t = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ val r = t
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .groupBy('num, 'w)
+ .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2)
+
+ r.toRetractStream[Row]
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
index b903eed..d44f316 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
@@ -253,17 +253,34 @@ final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)
var fNames: Array[String] = _
var fTypes: Array[TypeInformation[_]] = _
var sink = new TestingUpsertSink(keys, tz)
+ var expectedKeys: Option[Array[String]] = None
+ var expectedIsAppendOnly: Option[Boolean] = None
def this(keys: Array[Int]) {
this(keys, TimeZone.getTimeZone("UTC"))
}
override def setKeyFields(keys: Array[String]): Unit = {
- // ignore
+ if (expectedKeys.isDefined && keys == null) {
+ throw new AssertionError("Provided key fields should not be null.")
+ } else if (expectedKeys.isEmpty) {
+ return
+ }
+ val expectedStr = expectedKeys.get.sorted.mkString(",")
+ val keysStr = keys.sorted.mkString(",")
+ if (!expectedStr.equals(keysStr)) {
+ throw new AssertionError(
+ s"Provided key fields($keysStr) do not match expected keys($expectedStr)")
+ }
}
override def setIsAppendOnly(isAppendOnly: JBoolean): Unit = {
- // ignore
+ if (expectedIsAppendOnly.isEmpty) {
+ return
+ }
+ if (expectedIsAppendOnly.get != isAppendOnly) {
+ throw new AssertionError("Provided isAppendOnly does not match expected isAppendOnly")
+ }
}
override def getRecordType: TypeInformation[BaseRow] =