You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/09 09:25:49 UTC
[flink] branch release-1.11 updated:
[FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in
Blink planner
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 9984b58 [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
9984b58 is described below
commit 9984b5869b2f35b1fb8ce75e09ad73574d9f91ec
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Jan 28 12:01:39 2021 +0800
[FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
This closes #14785.
---
.../planner/codegen/OperatorCodeGenerator.scala | 3 +
.../table/planner/codegen/SinkCodeGenerator.scala | 32 ++-
.../physical/stream/StreamExecLegacySink.scala | 8 +-
.../runtime/stream/table/TableSinkITCase.scala | 94 +------
.../stream/table/TableToDataStreamITCase.scala | 308 +++++++++++++++++++++
5 files changed, 346 insertions(+), 99 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index a846592..8e3a9ed 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -256,4 +256,7 @@ object OperatorCodeGenerator extends Logging {
def generateCollect(emit: String): String =
s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit));"
+
+ def generateCollectWithTimestamp(emit: String, timestampTerm: String): String =
+ s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit, $timestampTerm));"
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index f46bd7c..1bd473d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.data.util.RowDataUtil
import org.apache.flink.table.data.{GenericRowData, RowData}
import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal
import org.apache.flink.table.planner.codegen.GeneratedExpression.NO_CODE
-import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
+import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, generateCollectWithTimestamp}
import org.apache.flink.table.planner.sinks.TableSinkUtils
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -48,7 +48,8 @@ object SinkCodeGenerator {
inputRowType: RowType,
sink: TableSink[_],
withChangeFlag: Boolean,
- operatorName: String): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = {
+ operatorName: String,
+ rowtimeIndex: Int = -1): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = {
val physicalOutputType = TableSinkUtils.inferSinkPhysicalDataType(
sink.getConsumedDataType,
@@ -69,6 +70,7 @@ object SinkCodeGenerator {
val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
var afterIndexModify = inputTerm
+ var modifiedRowtimeIndex = rowtimeIndex
val fieldIndexProcessCode = physicalTypeInfo match {
case pojo: PojoTypeInfo[_] =>
val mapping = pojo.getFieldNames.map { name =>
@@ -88,6 +90,10 @@ object SinkCodeGenerator {
(0 until pojo.getArity)
.map(pojo.getTypeAt)
.map(fromTypeInfoToLogicalType): _*)
+ if (rowtimeIndex >= 0) {
+ modifiedRowtimeIndex = outputRowType.getFieldIndex(
+ inputRowType.getFieldNames.get(rowtimeIndex))
+ }
val conversion = resultGenerator.generateConverterResultExpression(
outputRowType,
classOf[GenericRowData])
@@ -114,7 +120,7 @@ object SinkCodeGenerator {
|$tupleClass $resultTerm = new $tupleClass();
|$resultTerm.setField($flagResultTerm, 0);
|$resultTerm.setField($outTerm, 1);
- |${generateCollect(resultTerm)}
+ |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
""".stripMargin
} else {
// Scala Case Class
@@ -133,11 +139,11 @@ object SinkCodeGenerator {
|$fieldsTerm[0] = $flagResultTerm;
|$fieldsTerm[1] = $outTerm;
|$tupleClass $resultTerm = ($tupleClass) $serializerTerm.createInstance($fieldsTerm);
- |${generateCollect(resultTerm)}
+ |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
""".stripMargin
}
} else {
- generateCollect(outTerm)
+ generateCollectCode(afterIndexModify, outTerm, modifiedRowtimeIndex)
}
val generated = OperatorCodeGenerator.generateOneInputStreamOperator[RowData, OUT](
@@ -150,4 +156,20 @@ object SinkCodeGenerator {
inputRowType)
(new CodeGenOperatorFactory[OUT](generated), outputTypeInfo.asInstanceOf[TypeInformation[OUT]])
}
+
+ private def generateCollectCode(
+ afterIndexModify: String,
+ resultTerm: String,
+ modifiedRowtimeIndex: Int): String = {
+ if (modifiedRowtimeIndex >= 0) {
+ val rowtimeTerm = CodeGenUtils.newName("rowtime")
+ s"""
+ | Long $rowtimeTerm =
+ | $afterIndexModify.getTimestamp($modifiedRowtimeIndex, 3).getMillisecond();
+ | ${generateCollectWithTimestamp(resultTerm, rowtimeTerm)}
+ """.stripMargin
+ } else {
+ generateCollect(resultTerm)
+ }
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
index f8d5006..5aed5ef 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
@@ -188,13 +188,19 @@ class StreamExecLegacySink[T](
if (CodeGenUtils.isInternalClass(resultDataType)) {
parTransformation.asInstanceOf[Transformation[T]]
} else {
+ val rowtimeIndex = if (rowtimeFields.size == 1) {
+ rowtimeFields.head.getIndex
+ } else {
+ -1
+ }
val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
CodeGeneratorContext(config),
config,
convType.asInstanceOf[RowDataTypeInfo].toRowType,
sink,
withChangeFlag,
- "SinkConversion"
+ "SinkConversion",
+ rowtimeIndex
)
new OneInputTransformation(
parTransformation,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 6bc2bc0..0a654c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import org.apache.flink.table.planner.factories.TestValuesTableFactory.{TestSinkContextTableSink, changelogRow}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
import org.apache.flink.table.planner.runtime.utils.TestData.{nullData4, smallTupleData3, tupleData3, tupleData5}
import org.apache.flink.util.ExceptionUtils
@@ -31,9 +31,6 @@ import org.junit.Test
import java.lang.{Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
-import java.sql.Timestamp
-import java.time.{LocalDateTime, OffsetDateTime, ZoneId, ZoneOffset}
-import java.util.TimeZone
import scala.collection.JavaConversions._
@@ -622,93 +619,4 @@ class TableSinkITCase extends StreamingTestBase {
val expected = List("book,1,12", "book,4,11", "fruit,3,44")
assertEquals(expected.sorted, result.sorted)
}
-
- @Test
- def testSinkContext(): Unit = {
- val data = List(
- rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
- rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
- rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
- rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
- rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
- rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
- rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
-
- val dataId: String = TestValuesTableFactory.registerData(data)
-
- val sourceDDL =
- s"""
- |CREATE TABLE src (
- | log_ts STRING,
- | ts TIMESTAMP(3),
- | a INT,
- | b DOUBLE,
- | WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
- |) WITH (
- | 'connector' = 'values',
- | 'data-id' = '$dataId'
- |)
- """.stripMargin
-
- val sinkDDL =
- s"""
- |CREATE TABLE sink (
- | log_ts STRING,
- | ts TIMESTAMP(3),
- | a INT,
- | b DOUBLE
- |) WITH (
- | 'connector' = 'values',
- | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
- |)
- """.stripMargin
-
- tEnv.executeSql(sourceDDL)
- tEnv.executeSql(sinkDDL)
-
- //---------------------------------------------------------------------------------------
- // Verify writing out a source directly with the rowtime attribute
- //---------------------------------------------------------------------------------------
-
- execInsertSqlAndWaitResult("INSERT INTO sink SELECT * FROM src")
-
- val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
- assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
-
- val sinkDDL2 =
- s"""
- |CREATE TABLE sink2 (
- | window_rowtime TIMESTAMP(3),
- | b DOUBLE
- |) WITH (
- | 'connector' = 'values',
- | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
- |)
- """.stripMargin
- tEnv.executeSql(sinkDDL2)
-
- //---------------------------------------------------------------------------------------
- // Verify writing out with additional operator to generate a new rowtime attribute
- //---------------------------------------------------------------------------------------
-
- execInsertSqlAndWaitResult(
- """
- |INSERT INTO sink2
- |SELECT
- | TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
- | SUM(b)
- |FROM src
- |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
- |""".stripMargin
- )
-
- val expected2 = List(4999, 9999, 19999)
- assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
- }
-
- // ------------------------------------------------------------------------------------------
-
- private def localDateTime(epochSecond: Long): LocalDateTime = {
- LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
new file mode 100644
index 0000000..cd92266
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table
+
+import java.time.{LocalDateTime, ZoneOffset}
+import java.util.TimeZone
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.TestSinkContextTableSink
+import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestSinkUtil, TestingRetractSink}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+/**
+ * Test the conversion between [[Table]] and [[DataStream]] should
+ * not loss row time attribute.
+ */
+final class TableToDataStreamITCase extends StreamingTestBase {
+
+ @Test
+ def testHasRowtimeFromTableToAppendStream(): Unit = {
+ val data = List(
+ rowOf(localDateTime(1L), "A"),
+ rowOf(localDateTime(2L), "B"),
+ rowOf(localDateTime(3L), "C"),
+ rowOf(localDateTime(4L), "D"),
+ rowOf(localDateTime(7L), "E"))
+
+ val dataId: String = TestValuesTableFactory.registerData(data)
+
+ val sourceDDL =
+ s"""
+ |CREATE TABLE src (
+ | ts TIMESTAMP(3),
+ | a STRING,
+ | WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$dataId'
+ |)
+ """.stripMargin
+
+ tEnv.executeSql(sourceDDL)
+ val dataStream = tEnv.sqlQuery("SELECT a, ts FROM src").toAppendStream[Row]
+
+ val expected = List(
+ "A,1970-01-01T00:00:01,1000",
+ "B,1970-01-01T00:00:02,2000",
+ "C,1970-01-01T00:00:03,3000",
+ "D,1970-01-01T00:00:04,4000",
+ "E,1970-01-01T00:00:07,7000")
+
+ val sink = new StringWithTimestampSink[Row]
+ dataStream.addSink(sink)
+ env.execute("TableToAppendStream")
+ assertEquals(expected, sink.getResults.sorted)
+
+ }
+
+ @Test
+ def testHasRowtimeFromTableToRetractStream(): Unit = {
+ val data = List(
+ rowOf(localDateTime(1L), "A"),
+ rowOf(localDateTime(2L), "A"),
+ rowOf(localDateTime(3L), "C"),
+ rowOf(localDateTime(4L), "D"),
+ rowOf(localDateTime(7L), "E"))
+
+ val dataId: String = TestValuesTableFactory.registerData(data)
+
+ val sourceDDL =
+ s"""
+ |CREATE TABLE src (
+ | ts TIMESTAMP(3),
+ | a STRING,
+ | WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$dataId'
+ |)
+ """.stripMargin
+
+ tEnv.executeSql(sourceDDL)
+ val dataStream = tEnv.sqlQuery(
+ """
+ |SELECT a, ts
+ |FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY ts DESC) as rowNum
+ | FROM src
+ |)
+ |WHERE rowNum = 1
+ """.stripMargin
+ ).toRetractStream[Row]
+
+ val sink = new StringWithTimestampRetractSink[Row]
+ dataStream.addSink(sink)
+ env.execute("TableToRetractStream")
+
+ val expected = List(
+ "A,1970-01-01T00:00:02,2000",
+ "C,1970-01-01T00:00:03,3000",
+ "D,1970-01-01T00:00:04,4000",
+ "E,1970-01-01T00:00:07,7000")
+ assertEquals(expected, sink.getRetractResults.sorted)
+
+ val expectedRetract = List(
+ "(true,A,1970-01-01T00:00:01,1000)",
+ "(false,A,1970-01-01T00:00:01,1000)",
+ "(true,A,1970-01-01T00:00:02,2000)",
+ "(true,C,1970-01-01T00:00:03,3000)",
+ "(true,D,1970-01-01T00:00:04,4000)",
+ "(true,E,1970-01-01T00:00:07,7000)")
+ assertEquals(expectedRetract.sorted, sink.getRawResults.sorted)
+ }
+
+ @Test
+ def testHasRowtimeFromDataStreamToTableBackDataStream(): Unit = {
+ val data = Seq(
+ (1L, "A"),
+ (2L, "B"),
+ (3L, "C"),
+ (4L, "D"),
+ (7L, "E"))
+
+ val ds1 = env.fromCollection(data)
+ // second to millisecond
+ .assignAscendingTimestamps(_._1 * 1000L)
+ val table = ds1.toTable(tEnv, 'ts, 'a, 'rowtime.rowtime)
+ tEnv.registerTable("t1", table)
+
+ val ds2 = tEnv.sqlQuery(
+ """
+ | SELECT CONCAT(a, '_'), ts, rowtime
+ | FROM t1
+ """.stripMargin
+ ).toAppendStream[Row]
+
+ val expected = List(
+ "A_,1,1970-01-01T00:00:01,1000",
+ "B_,2,1970-01-01T00:00:02,2000",
+ "C_,3,1970-01-01T00:00:03,3000",
+ "D_,4,1970-01-01T00:00:04,4000",
+ "E_,7,1970-01-01T00:00:07,7000")
+
+ val sink = new StringWithTimestampSink[Row]
+ ds2.addSink(sink)
+ env.execute("DataStreamToTableBackDataStream")
+ assertEquals(expected, sink.getResults.sorted)
+ }
+
+ @Test
+ def testHasRowtimeFromTableToExternalSystem(): Unit = {
+ val data = List(
+ rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
+ rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
+ rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
+ rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
+ rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
+ rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
+ rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
+
+ val dataId: String = TestValuesTableFactory.registerData(data)
+
+ val sourceDDL =
+ s"""
+ |CREATE TABLE src (
+ | log_ts STRING,
+ | ts TIMESTAMP(3),
+ | a INT,
+ | b DOUBLE,
+ | WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$dataId'
+ |)
+ """.stripMargin
+
+ val sinkDDL =
+ s"""
+ |CREATE TABLE sink (
+ | log_ts STRING,
+ | ts TIMESTAMP(3),
+ | a INT,
+ | b DOUBLE
+ |) WITH (
+ | 'connector' = 'values',
+ | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+ |)
+ """.stripMargin
+
+ tEnv.executeSql(sourceDDL)
+ tEnv.executeSql(sinkDDL)
+
+ //---------------------------------------------------------------------------------------
+ // Verify writing out a source directly with the rowtime attribute
+ //---------------------------------------------------------------------------------------
+
+ execInsertSqlAndWaitResult("INSERT INTO sink SELECT * FROM src")
+
+ val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
+ assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+
+ val sinkDDL2 =
+ s"""
+ |CREATE TABLE sink2 (
+ | window_rowtime TIMESTAMP(3),
+ | b DOUBLE
+ |) WITH (
+ | 'connector' = 'values',
+ | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+ |)
+ """.stripMargin
+ tEnv.executeSql(sinkDDL2)
+
+ //---------------------------------------------------------------------------------------
+ // Verify writing out with additional operator to generate a new rowtime attribute
+ //---------------------------------------------------------------------------------------
+
+ execInsertSqlAndWaitResult(
+ """
+ |INSERT INTO sink2
+ |SELECT
+ | TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
+ | SUM(b)
+ |FROM src
+ |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
+ |""".stripMargin
+ )
+
+ val expected2 = List(4999, 9999, 19999)
+ assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+ }
+
+ private def localDateTime(epochSecond: Long): LocalDateTime = {
+ LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+ }
+}
+
+/**
+ * Append test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampSink[T] extends AbstractExactlyOnceSink[T]() {
+
+ override def invoke(value: T, context: SinkFunction.Context[_]) {
+ localResults += s"${value.toString},${context.timestamp()}"
+ }
+
+ override def getResults: List[String] = super.getResults
+}
+
+/**
+ * Retract test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampRetractSink[T](tz: TimeZone) extends
+ TestingRetractSink(tz) {
+
+ def this() {
+ this(TimeZone.getTimeZone("UTC"))
+ }
+
+ override def invoke(v: (Boolean, Row), context: SinkFunction.Context[_]): Unit = {
+ this.synchronized {
+ val rowString = s"${TestSinkUtil.rowToString(v._2, tz)},${context.timestamp()}"
+
+ val tupleString = "(" + v._1.toString + "," + rowString + ")"
+ localResults += tupleString
+ if (v._1) {
+ localRetractResults += rowString
+ } else {
+ val index = localRetractResults.indexOf(rowString)
+ if (index >= 0) {
+ localRetractResults.remove(index)
+ } else {
+ throw new RuntimeException("Tried to retract a value that wasn't added first. " +
+ "This is probably an incorrectly implemented test. " +
+ "Try to set the parallelism of the sink to 1.")
+ }
+ }
+ }
+ }
+
+ override def getResults: List[String] = super.getResults
+}