You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/09 10:37:30 UTC

[flink] branch release-1.12 updated: [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 20030c2  [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
20030c2 is described below

commit 20030c255849cc1f0a01045fdaa8a16d883726b6
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Jan 28 12:01:39 2021 +0800

    [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
    
    This closes #14785.
---
 .../planner/codegen/OperatorCodeGenerator.scala    |   3 +
 .../table/planner/codegen/SinkCodeGenerator.scala  |  32 ++-
 .../physical/stream/StreamExecLegacySink.scala     |   8 +-
 .../runtime/stream/table/TableSinkITCase.scala     | 100 +------
 .../stream/table/TableToDataStreamITCase.scala     | 308 +++++++++++++++++++++
 5 files changed, 349 insertions(+), 102 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index a846592..8e3a9ed 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -256,4 +256,7 @@ object OperatorCodeGenerator extends Logging {
 
   def generateCollect(emit: String): String =
     s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit));"
+
+  def generateCollectWithTimestamp(emit: String, timestampTerm: String): String =
+    s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit, $timestampTerm));"
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index e5e4edd..fc97cc0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.data.util.RowDataUtil
 import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{genToExternalConverter, genToExternalConverterWithLegacy}
 import org.apache.flink.table.planner.codegen.GeneratedExpression.NO_CODE
-import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
+import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, generateCollectWithTimestamp}
 import org.apache.flink.table.planner.sinks.TableSinkUtils
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -48,7 +48,8 @@ object SinkCodeGenerator {
       inputRowType: RowType,
       sink: TableSink[_],
       withChangeFlag: Boolean,
-      operatorName: String): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = {
+      operatorName: String,
+      rowtimeIndex: Int = -1): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = {
 
     val physicalOutputType = TableSinkUtils.inferSinkPhysicalDataType(
       sink.getConsumedDataType,
@@ -69,6 +70,7 @@ object SinkCodeGenerator {
 
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     var afterIndexModify = inputTerm
+    var modifiedRowtimeIndex = rowtimeIndex
     val fieldIndexProcessCode = physicalTypeInfo match {
       case pojo: PojoTypeInfo[_] =>
         val mapping = pojo.getFieldNames.map { name =>
@@ -88,6 +90,10 @@ object SinkCodeGenerator {
           (0 until pojo.getArity)
             .map(pojo.getTypeAt)
             .map(fromTypeInfoToLogicalType): _*)
+        if (rowtimeIndex >= 0) {
+          modifiedRowtimeIndex = outputRowType.getFieldIndex(
+            inputRowType.getFieldNames.get(rowtimeIndex))
+        }
         val conversion = resultGenerator.generateConverterResultExpression(
           outputRowType,
           classOf[GenericRowData])
@@ -115,7 +121,7 @@ object SinkCodeGenerator {
            |$tupleClass $resultTerm = new $tupleClass();
            |$resultTerm.setField($flagResultTerm, 0);
            |$resultTerm.setField($outTerm, 1);
-           |${generateCollect(resultTerm)}
+           |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
          """.stripMargin
       } else {
         // Scala Case Class
@@ -134,11 +140,11 @@ object SinkCodeGenerator {
            |$fieldsTerm[0] = $flagResultTerm;
            |$fieldsTerm[1] = $outTerm;
            |$tupleClass $resultTerm = ($tupleClass) $serializerTerm.createInstance($fieldsTerm);
-           |${generateCollect(resultTerm)}
+           |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
          """.stripMargin
       }
     } else {
-      generateCollect(outTerm)
+      generateCollectCode(afterIndexModify, outTerm, modifiedRowtimeIndex)
     }
 
     val generated = OperatorCodeGenerator.generateOneInputStreamOperator[RowData, OUT](
@@ -151,4 +157,20 @@ object SinkCodeGenerator {
       inputRowType)
     (new CodeGenOperatorFactory[OUT](generated), outputTypeInfo.asInstanceOf[TypeInformation[OUT]])
   }
+
+  private def generateCollectCode(
+      afterIndexModify: String,
+      resultTerm: String,
+      modifiedRowtimeIndex: Int): String = {
+    if (modifiedRowtimeIndex >= 0) {
+      val rowtimeTerm = CodeGenUtils.newName("rowtime")
+      s"""
+         | Long $rowtimeTerm =
+         | $afterIndexModify.getTimestamp($modifiedRowtimeIndex, 3).getMillisecond();
+         | ${generateCollectWithTimestamp(resultTerm, rowtimeTerm)}
+          """.stripMargin
+    } else {
+      generateCollect(resultTerm)
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
index 2dec343..483edd7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala
@@ -188,13 +188,19 @@ class StreamExecLegacySink[T](
     if (CodeGenUtils.isInternalClass(resultDataType)) {
       parTransformation.asInstanceOf[Transformation[T]]
     } else {
+      val rowtimeIndex = if (rowtimeFields.size == 1) {
+        rowtimeFields.head.getIndex
+      } else {
+        -1
+      }
       val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
         CodeGeneratorContext(config),
         config,
         convType.asInstanceOf[InternalTypeInfo[RowData]].toRowType,
         sink,
         withChangeFlag,
-        "SinkConversion"
+        "SinkConversion",
+        rowtimeIndex
       )
       new OneInputTransformation(
         parTransformation,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 2eda6a9..a1256a2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -22,18 +22,16 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import org.apache.flink.table.planner.factories.TestValuesTableFactory
-.{TestSinkContextTableSink, changelogRow}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
-import org.apache.flink.table.planner.runtime.utils.TestData
-.{nullData4, smallTupleData3,
-tupleData3, tupleData5,data1}
+import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.util.ExceptionUtils
+
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.Test
+
 import java.lang.{Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
-import java.time.{LocalDateTime, ZoneOffset}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
@@ -626,89 +624,6 @@ class TableSinkITCase extends StreamingTestBase {
   }
 
   @Test
-  def testSinkContext(): Unit = {
-    val data = List(
-      rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
-      rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
-      rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
-      rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
-      rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
-      rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
-      rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
-
-    val dataId: String = TestValuesTableFactory.registerData(data)
-
-    val sourceDDL =
-      s"""
-         |CREATE TABLE src (
-         |  log_ts STRING,
-         |  ts TIMESTAMP(3),
-         |  a INT,
-         |  b DOUBLE,
-         |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
-         |) WITH (
-         |  'connector' = 'values',
-         |  'data-id' = '$dataId'
-         |)
-      """.stripMargin
-
-    val sinkDDL =
-      s"""
-         |CREATE TABLE sink (
-         |  log_ts STRING,
-         |  ts TIMESTAMP(3),
-         |  a INT,
-         |  b DOUBLE
-         |) WITH (
-         |  'connector' = 'values',
-         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
-         |)
-      """.stripMargin
-
-    tEnv.executeSql(sourceDDL)
-    tEnv.executeSql(sinkDDL)
-
-    //---------------------------------------------------------------------------------------
-    // Verify writing out a source directly with the rowtime attribute
-    //---------------------------------------------------------------------------------------
-
-    tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await()
-
-    val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
-    assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
-
-    val sinkDDL2 =
-      s"""
-         |CREATE TABLE sink2 (
-         |  window_rowtime TIMESTAMP(3),
-         |  b DOUBLE
-         |) WITH (
-         |  'connector' = 'values',
-         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
-         |)
-      """.stripMargin
-    tEnv.executeSql(sinkDDL2)
-
-    //---------------------------------------------------------------------------------------
-    // Verify writing out with additional operator to generate a new rowtime attribute
-    //---------------------------------------------------------------------------------------
-
-    tEnv.executeSql(
-      """
-        |INSERT INTO sink2
-        |SELECT
-        |  TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
-        |  SUM(b)
-        |FROM src
-        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
-        |""".stripMargin
-    ).await()
-
-    val expected2 = List(4999, 9999, 19999)
-    assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
-  }
-
-  @Test
   def testMetadataSourceAndSink(): Unit = {
     val dataId = TestValuesTableFactory.registerData(nullData4)
     // tests metadata at different locations and casting in both sources and sinks
@@ -919,7 +834,6 @@ class TableSinkITCase extends StreamingTestBase {
 
   }
 
-
   private def innerTestSetParallelism(provider: String, parallelism: Int, index: Int): Unit = {
     val dataId = TestValuesTableFactory.registerData(data1)
     val sourceTableName = s"test_para_source_${provider.toLowerCase.trim}_$index"
@@ -951,10 +865,4 @@ class TableSinkITCase extends StreamingTestBase {
          |""".stripMargin)
     tEnv.executeSql(s"INSERT INTO $sinkTableName SELECT * FROM $sourceTableName").await()
   }
-
-  // ------------------------------------------------------------------------------------------
-
-  private def localDateTime(epochSecond: Long): LocalDateTime = {
-    LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
new file mode 100644
index 0000000..dc3a2eb
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table
+
+import java.time.{LocalDateTime, ZoneOffset}
+import java.util.TimeZone
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.TestSinkContextTableSink
+import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestSinkUtil, TestingRetractSink}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+/**
+ * Test the conversion between [[Table]] and [[DataStream]] should
+ * not loss row time attribute.
+ */
+final class TableToDataStreamITCase extends StreamingTestBase {
+
+  @Test
+  def testHasRowtimeFromTableToAppendStream(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "B"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery("SELECT a, ts FROM src").toAppendStream[Row]
+
+    val expected = List(
+      "A,1970-01-01T00:00:01, 1000",
+      "B,1970-01-01T00:00:02, 2000",
+      "C,1970-01-01T00:00:03, 3000",
+      "D,1970-01-01T00:00:04, 4000",
+      "E,1970-01-01T00:00:07, 7000")
+
+    val sink = new StringWithTimestampSink[Row]
+    dataStream.addSink(sink)
+    env.execute("TableToAppendStream")
+    assertEquals(expected, sink.getResults.sorted)
+
+  }
+
+  @Test
+  def testHasRowtimeFromTableToRetractStream(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "A"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery(
+      """
+        |SELECT a, ts
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY ts DESC) as rowNum
+        |  FROM src
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    ).toRetractStream[Row]
+
+    val sink = new StringWithTimestampRetractSink[Row]
+    dataStream.addSink(sink)
+    env.execute("TableToRetractStream")
+
+    val expected = List(
+      "A,1970-01-01T00:00:02,2000",
+      "C,1970-01-01T00:00:03,3000",
+      "D,1970-01-01T00:00:04,4000",
+      "E,1970-01-01T00:00:07,7000")
+    assertEquals(expected, sink.getRetractResults.sorted)
+
+    val expectedRetract = List(
+      "(true,A,1970-01-01T00:00:01,1000)",
+      "(false,A,1970-01-01T00:00:01,1000)",
+      "(true,A,1970-01-01T00:00:02,2000)",
+      "(true,C,1970-01-01T00:00:03,3000)",
+      "(true,D,1970-01-01T00:00:04,4000)",
+      "(true,E,1970-01-01T00:00:07,7000)")
+    assertEquals(expectedRetract.sorted, sink.getRawResults.sorted)
+  }
+
+  @Test
+  def testHasRowtimeFromDataStreamToTableBackDataStream(): Unit = {
+    val data = Seq(
+      (1L, "A"),
+      (2L, "B"),
+      (3L, "C"),
+      (4L, "D"),
+      (7L, "E"))
+
+    val ds1 = env.fromCollection(data)
+      // second to millisecond
+      .assignAscendingTimestamps(_._1 * 1000L)
+    val table = ds1.toTable(tEnv, 'ts, 'a, 'rowtime.rowtime)
+    tEnv.registerTable("t1", table)
+
+    val ds2 = tEnv.sqlQuery(
+      """
+        | SELECT CONCAT(a, '_'), ts, rowtime
+        | FROM t1
+      """.stripMargin
+    ).toAppendStream[Row]
+
+    val expected = List(
+      "A_,1,1970-01-01T00:00:01, 1000",
+      "B_,2,1970-01-01T00:00:02, 2000",
+      "C_,3,1970-01-01T00:00:03, 3000",
+      "D_,4,1970-01-01T00:00:04, 4000",
+      "E_,7,1970-01-01T00:00:07, 7000")
+
+    val sink = new StringWithTimestampSink[Row]
+    ds2.addSink(sink)
+    env.execute("DataStreamToTableBackDataStream")
+    assertEquals(expected, sink.getResults.sorted)
+  }
+
+  @Test
+  def testHasRowtimeFromTableToExternalSystem(): Unit = {
+    val data = List(
+      rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
+      rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
+      rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
+      rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
+      rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
+      rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
+      rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  log_ts STRING,
+         |  ts TIMESTAMP(3),
+         |  a INT,
+         |  b DOUBLE,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    val sinkDDL =
+      s"""
+         |CREATE TABLE sink (
+         |  log_ts STRING,
+         |  ts TIMESTAMP(3),
+         |  a INT,
+         |  b DOUBLE
+         |) WITH (
+         |  'connector' = 'values',
+         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    tEnv.executeSql(sinkDDL)
+
+    //---------------------------------------------------------------------------------------
+    // Verify writing out a source directly with the rowtime attribute
+    //---------------------------------------------------------------------------------------
+
+    tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await()
+
+    val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
+    assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+
+    val sinkDDL2 =
+      s"""
+         |CREATE TABLE sink2 (
+         |  window_rowtime TIMESTAMP(3),
+         |  b DOUBLE
+         |) WITH (
+         |  'connector' = 'values',
+         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+         |)
+      """.stripMargin
+    tEnv.executeSql(sinkDDL2)
+
+    //---------------------------------------------------------------------------------------
+    // Verify writing out with additional operator to generate a new rowtime attribute
+    //---------------------------------------------------------------------------------------
+
+    tEnv.executeSql(
+      """
+        |INSERT INTO sink2
+        |SELECT
+        |  TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
+        |  SUM(b)
+        |FROM src
+        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
+        |""".stripMargin
+    ).await()
+
+    val expected2 = List(4999, 9999, 19999)
+    assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+  }
+
+  private def localDateTime(epochSecond: Long): LocalDateTime = {
+    LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+  }
+}
+
+/**
+ * Append test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampSink[T] extends AbstractExactlyOnceSink[T]() {
+
+  override def invoke(value: T, context: SinkFunction.Context): Unit = {
+    localResults += s"${value.toString}, ${context.timestamp()}"
+  }
+
+  override def getResults: List[String] = super.getResults
+}
+
+/**
+ * Retract test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampRetractSink[T](tz: TimeZone) extends
+  TestingRetractSink(tz) {
+
+  def this() {
+    this(TimeZone.getTimeZone("UTC"))
+  }
+
+  override def invoke(v: (Boolean, Row), context: SinkFunction.Context): Unit = {
+    this.synchronized {
+      val rowString = s"${TestSinkUtil.rowToString(v._2, tz)},${context.timestamp()}"
+
+      val tupleString = "(" + v._1.toString + "," + rowString + ")"
+      localResults += tupleString
+      if (v._1) {
+        localRetractResults += rowString
+      } else {
+        val index = localRetractResults.indexOf(rowString)
+        if (index >= 0) {
+          localRetractResults.remove(index)
+        } else {
+          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
+            "This is probably an incorrectly implemented test. " +
+            "Try to set the parallelism of the sink to 1.")
+        }
+      }
+    }
+  }
+
+  override def getResults: List[String] = super.getResults
+}