You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/09 09:25:49 UTC

[flink] branch release-1.11 updated: [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 9984b58  [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
9984b58 is described below

commit 9984b5869b2f35b1fb8ce75e09ad73574d9f91ec
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Jan 28 12:01:39 2021 +0800

    [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
    
    This closes #14785.
---
 .../planner/codegen/OperatorCodeGenerator.scala    |   3 +
 .../table/planner/codegen/SinkCodeGenerator.scala  |  32 ++-
 .../physical/stream/StreamExecLegacySink.scala     |   8 +-
 .../runtime/stream/table/TableSinkITCase.scala     |  94 +------
 .../stream/table/TableToDataStreamITCase.scala     | 308 +++++++++++++++++++++
 5 files changed, 346 insertions(+), 99 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index a846592..8e3a9ed 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -256,4 +256,7 @@ object OperatorCodeGenerator extends Logging {
 
   def generateCollect(emit: String): String =
     s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit));"
+
+  def generateCollectWithTimestamp(emit: String, timestampTerm: String): String =
+    s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit, $timestampTerm));"
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index f46bd7c..1bd473d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.data.util.RowDataUtil
 import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal
 import org.apache.flink.table.planner.codegen.GeneratedExpression.NO_CODE
-import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
+import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, generateCollectWithTimestamp}
 import org.apache.flink.table.planner.sinks.TableSinkUtils
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -48,7 +48,8 @@ object SinkCodeGenerator {
       inputRowType: RowType,
       sink: TableSink[_],
       withChangeFlag: Boolean,
-      operatorName: String): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = {
+      operatorName: String,
+      rowtimeIndex: Int = -1): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = {
 
     val physicalOutputType = TableSinkUtils.inferSinkPhysicalDataType(
       sink.getConsumedDataType,
@@ -69,6 +70,7 @@ object SinkCodeGenerator {
 
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     var afterIndexModify = inputTerm
+    var modifiedRowtimeIndex = rowtimeIndex
     val fieldIndexProcessCode = physicalTypeInfo match {
       case pojo: PojoTypeInfo[_] =>
         val mapping = pojo.getFieldNames.map { name =>
@@ -88,6 +90,10 @@ object SinkCodeGenerator {
           (0 until pojo.getArity)
             .map(pojo.getTypeAt)
             .map(fromTypeInfoToLogicalType): _*)
+        if (rowtimeIndex >= 0) {
+          modifiedRowtimeIndex = outputRowType.getFieldIndex(
+            inputRowType.getFieldNames.get(rowtimeIndex))
+        }
         val conversion = resultGenerator.generateConverterResultExpression(
           outputRowType,
           classOf[GenericRowData])
@@ -114,7 +120,7 @@ object SinkCodeGenerator {
            |$tupleClass $resultTerm = new $tupleClass();
            |$resultTerm.setField($flagResultTerm, 0);
            |$resultTerm.setField($outTerm, 1);
-           |${generateCollect(resultTerm)}
+           |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
          """.stripMargin
       } else {
         // Scala Case Class
@@ -133,11 +139,11 @@ object SinkCodeGenerator {
            |$fieldsTerm[0] = $flagResultTerm;
            |$fieldsTerm[1] = $outTerm;
            |$tupleClass $resultTerm = ($tupleClass) $serializerTerm.createInstance($fieldsTerm);
-           |${generateCollect(resultTerm)}
+           |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
          """.stripMargin
       }
     } else {
-      generateCollect(outTerm)
+      generateCollectCode(afterIndexModify, outTerm, modifiedRowtimeIndex)
     }
 
     val generated = OperatorCodeGenerator.generateOneInputStreamOperator[RowData, OUT](
@@ -150,4 +156,20 @@ object SinkCodeGenerator {
       inputRowType)
     (new CodeGenOperatorFactory[OUT](generated), outputTypeInfo.asInstanceOf[TypeInformation[OUT]])
   }
+
+  private def generateCollectCode(
+      afterIndexModify: String,
+      resultTerm: String,
+      modifiedRowtimeIndex: Int): String = {
+    if (modifiedRowtimeIndex >= 0) {
+      val rowtimeTerm = CodeGenUtils.newName("rowtime")
+      s"""
+         | Long $rowtimeTerm =
+         | $afterIndexModify.getTimestamp($modifiedRowtimeIndex, 3).getMillisecond();
+         | ${generateCollectWithTimestamp(resultTerm, rowtimeTerm)}
+          """.stripMargin
+    } else {
+      generateCollect(resultTerm)
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
index f8d5006..5aed5ef 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
@@ -188,13 +188,19 @@ class StreamExecLegacySink[T](
     if (CodeGenUtils.isInternalClass(resultDataType)) {
       parTransformation.asInstanceOf[Transformation[T]]
     } else {
+      val rowtimeIndex = if (rowtimeFields.size == 1) {
+        rowtimeFields.head.getIndex
+      } else {
+        -1
+      }
       val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
         CodeGeneratorContext(config),
         config,
         convType.asInstanceOf[RowDataTypeInfo].toRowType,
         sink,
         withChangeFlag,
-        "SinkConversion"
+        "SinkConversion",
+        rowtimeIndex
       )
       new OneInputTransformation(
         parTransformation,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 6bc2bc0..0a654c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import org.apache.flink.table.planner.factories.TestValuesTableFactory.{TestSinkContextTableSink, changelogRow}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData.{nullData4, smallTupleData3, tupleData3, tupleData5}
 import org.apache.flink.util.ExceptionUtils
@@ -31,9 +31,6 @@ import org.junit.Test
 
 import java.lang.{Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
-import java.sql.Timestamp
-import java.time.{LocalDateTime, OffsetDateTime, ZoneId, ZoneOffset}
-import java.util.TimeZone
 
 import scala.collection.JavaConversions._
 
@@ -622,93 +619,4 @@ class TableSinkITCase extends StreamingTestBase {
     val expected = List("book,1,12", "book,4,11", "fruit,3,44")
     assertEquals(expected.sorted, result.sorted)
   }
-
-  @Test
-  def testSinkContext(): Unit = {
-    val data = List(
-      rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
-      rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
-      rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
-      rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
-      rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
-      rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
-      rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
-
-    val dataId: String = TestValuesTableFactory.registerData(data)
-
-    val sourceDDL =
-      s"""
-         |CREATE TABLE src (
-         |  log_ts STRING,
-         |  ts TIMESTAMP(3),
-         |  a INT,
-         |  b DOUBLE,
-         |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
-         |) WITH (
-         |  'connector' = 'values',
-         |  'data-id' = '$dataId'
-         |)
-      """.stripMargin
-
-    val sinkDDL =
-      s"""
-         |CREATE TABLE sink (
-         |  log_ts STRING,
-         |  ts TIMESTAMP(3),
-         |  a INT,
-         |  b DOUBLE
-         |) WITH (
-         |  'connector' = 'values',
-         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
-         |)
-      """.stripMargin
-
-    tEnv.executeSql(sourceDDL)
-    tEnv.executeSql(sinkDDL)
-
-    //---------------------------------------------------------------------------------------
-    // Verify writing out a source directly with the rowtime attribute
-    //---------------------------------------------------------------------------------------
-
-    execInsertSqlAndWaitResult("INSERT INTO sink SELECT * FROM src")
-
-    val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
-    assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
-
-    val sinkDDL2 =
-      s"""
-         |CREATE TABLE sink2 (
-         |  window_rowtime TIMESTAMP(3),
-         |  b DOUBLE
-         |) WITH (
-         |  'connector' = 'values',
-         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
-         |)
-      """.stripMargin
-    tEnv.executeSql(sinkDDL2)
-
-    //---------------------------------------------------------------------------------------
-    // Verify writing out with additional operator to generate a new rowtime attribute
-    //---------------------------------------------------------------------------------------
-
-    execInsertSqlAndWaitResult(
-      """
-        |INSERT INTO sink2
-        |SELECT
-        |  TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
-        |  SUM(b)
-        |FROM src
-        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
-        |""".stripMargin
-    )
-
-    val expected2 = List(4999, 9999, 19999)
-    assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
-  }
-
-  // ------------------------------------------------------------------------------------------
-
-  private def localDateTime(epochSecond: Long): LocalDateTime = {
-    LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
new file mode 100644
index 0000000..cd92266
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table
+
+import java.time.{LocalDateTime, ZoneOffset}
+import java.util.TimeZone
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.TestSinkContextTableSink
+import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestSinkUtil, TestingRetractSink}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+/**
+ * Test the conversion between [[Table]] and [[DataStream]] should
+ * not loss row time attribute.
+ */
+final class TableToDataStreamITCase extends StreamingTestBase {
+
+  @Test
+  def testHasRowtimeFromTableToAppendStream(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "B"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery("SELECT a, ts FROM src").toAppendStream[Row]
+
+    val expected = List(
+      "A,1970-01-01T00:00:01,1000",
+      "B,1970-01-01T00:00:02,2000",
+      "C,1970-01-01T00:00:03,3000",
+      "D,1970-01-01T00:00:04,4000",
+      "E,1970-01-01T00:00:07,7000")
+
+    val sink = new StringWithTimestampSink[Row]
+    dataStream.addSink(sink)
+    env.execute("TableToAppendStream")
+    assertEquals(expected, sink.getResults.sorted)
+
+  }
+
+  @Test
+  def testHasRowtimeFromTableToRetractStream(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "A"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery(
+      """
+        |SELECT a, ts
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY ts DESC) as rowNum
+        |  FROM src
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    ).toRetractStream[Row]
+
+    val sink = new StringWithTimestampRetractSink[Row]
+    dataStream.addSink(sink)
+    env.execute("TableToRetractStream")
+
+    val expected = List(
+      "A,1970-01-01T00:00:02,2000",
+      "C,1970-01-01T00:00:03,3000",
+      "D,1970-01-01T00:00:04,4000",
+      "E,1970-01-01T00:00:07,7000")
+    assertEquals(expected, sink.getRetractResults.sorted)
+
+    val expectedRetract = List(
+      "(true,A,1970-01-01T00:00:01,1000)",
+      "(false,A,1970-01-01T00:00:01,1000)",
+      "(true,A,1970-01-01T00:00:02,2000)",
+      "(true,C,1970-01-01T00:00:03,3000)",
+      "(true,D,1970-01-01T00:00:04,4000)",
+      "(true,E,1970-01-01T00:00:07,7000)")
+    assertEquals(expectedRetract.sorted, sink.getRawResults.sorted)
+  }
+
+  @Test
+  def testHasRowtimeFromDataStreamToTableBackDataStream(): Unit = {
+    val data = Seq(
+      (1L, "A"),
+      (2L, "B"),
+      (3L, "C"),
+      (4L, "D"),
+      (7L, "E"))
+
+    val ds1 = env.fromCollection(data)
+      // second to millisecond
+      .assignAscendingTimestamps(_._1 * 1000L)
+    val table = ds1.toTable(tEnv, 'ts, 'a, 'rowtime.rowtime)
+    tEnv.registerTable("t1", table)
+
+    val ds2 = tEnv.sqlQuery(
+      """
+        | SELECT CONCAT(a, '_'), ts, rowtime
+        | FROM t1
+      """.stripMargin
+    ).toAppendStream[Row]
+
+    val expected = List(
+      "A_,1,1970-01-01T00:00:01,1000",
+      "B_,2,1970-01-01T00:00:02,2000",
+      "C_,3,1970-01-01T00:00:03,3000",
+      "D_,4,1970-01-01T00:00:04,4000",
+      "E_,7,1970-01-01T00:00:07,7000")
+
+    val sink = new StringWithTimestampSink[Row]
+    ds2.addSink(sink)
+    env.execute("DataStreamToTableBackDataStream")
+    assertEquals(expected, sink.getResults.sorted)
+  }
+
+  @Test
+  def testHasRowtimeFromTableToExternalSystem(): Unit = {
+    val data = List(
+      rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
+      rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
+      rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
+      rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
+      rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
+      rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
+      rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  log_ts STRING,
+         |  ts TIMESTAMP(3),
+         |  a INT,
+         |  b DOUBLE,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    val sinkDDL =
+      s"""
+         |CREATE TABLE sink (
+         |  log_ts STRING,
+         |  ts TIMESTAMP(3),
+         |  a INT,
+         |  b DOUBLE
+         |) WITH (
+         |  'connector' = 'values',
+         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    tEnv.executeSql(sinkDDL)
+
+    //---------------------------------------------------------------------------------------
+    // Verify writing out a source directly with the rowtime attribute
+    //---------------------------------------------------------------------------------------
+
+    execInsertSqlAndWaitResult("INSERT INTO sink SELECT * FROM src")
+
+    val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
+    assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+
+    val sinkDDL2 =
+      s"""
+         |CREATE TABLE sink2 (
+         |  window_rowtime TIMESTAMP(3),
+         |  b DOUBLE
+         |) WITH (
+         |  'connector' = 'values',
+         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+         |)
+      """.stripMargin
+    tEnv.executeSql(sinkDDL2)
+
+    //---------------------------------------------------------------------------------------
+    // Verify writing out with additional operator to generate a new rowtime attribute
+    //---------------------------------------------------------------------------------------
+
+    execInsertSqlAndWaitResult(
+      """
+        |INSERT INTO sink2
+        |SELECT
+        |  TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
+        |  SUM(b)
+        |FROM src
+        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
+        |""".stripMargin
+    )
+
+    val expected2 = List(4999, 9999, 19999)
+    assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+  }
+
+  private def localDateTime(epochSecond: Long): LocalDateTime = {
+    LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+  }
+}
+
+/**
+ * Append test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampSink[T] extends AbstractExactlyOnceSink[T]() {
+
+  override def invoke(value: T, context: SinkFunction.Context[_]) {
+    localResults += s"${value.toString},${context.timestamp()}"
+  }
+
+  override def getResults: List[String] = super.getResults
+}
+
+/**
+ * Retract test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampRetractSink[T](tz: TimeZone) extends
+  TestingRetractSink(tz) {
+
+  def this() {
+    this(TimeZone.getTimeZone("UTC"))
+  }
+
+  override def invoke(v: (Boolean, Row), context: SinkFunction.Context[_]): Unit = {
+    this.synchronized {
+      val rowString = s"${TestSinkUtil.rowToString(v._2, tz)},${context.timestamp()}"
+
+      val tupleString = "(" + v._1.toString + "," + rowString + ")"
+      localResults += tupleString
+      if (v._1) {
+        localRetractResults += rowString
+      } else {
+        val index = localRetractResults.indexOf(rowString)
+        if (index >= 0) {
+          localRetractResults.remove(index)
+        } else {
+          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
+            "This is probably an incorrectly implemented test. " +
+            "Try to set the parallelism of the sink to 1.")
+        }
+      }
+    }
+  }
+
+  override def getResults: List[String] = super.getResults
+}