You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/09 08:39:31 UTC

[flink] branch master updated: [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e955ec1  [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
e955ec1 is described below

commit e955ec1d1913a2c244a1e3835304c97fc97b3ea1
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Jan 28 14:57:03 2021 +0800

    [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
    
    This closes #14787.
---
 .../nodes/exec/common/CommonExecLegacySink.java    |  22 +-
 .../planner/codegen/OperatorCodeGenerator.scala    |   3 +
 .../table/planner/codegen/SinkCodeGenerator.scala  |  32 ++-
 .../runtime/stream/table/TableSinkITCase.scala     |  92 +-----
 .../stream/table/TableToDataStreamITCase.scala     | 308 +++++++++++++++++++++
 5 files changed, 360 insertions(+), 97 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
index 479ffc6..1cad481 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.sinks.DataStreamTableSink;
 import org.apache.flink.table.planner.sinks.TableSinkUtils;
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.table.sinks.StreamTableSink;
@@ -47,7 +48,9 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * Base {@link ExecNode} to to write data into an external sink defined by a {@link TableSink}.
@@ -171,6 +174,8 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> {
         if (CodeGenUtils.isInternalClass(resultDataType)) {
             return (Transformation<T>) inputTransform;
         } else {
+            final int rowtimeIndex = getRowtimeIndex(inputRowType);
+
             final DataType physicalOutputType =
                     TableSinkUtils.inferSinkPhysicalDataType(
                             resultDataType, convertedInputRowType, withChangeFlag);
@@ -186,7 +191,8 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> {
                             tableSink,
                             physicalOutputType,
                             withChangeFlag,
-                            "SinkConversion");
+                            "SinkConversion",
+                            rowtimeIndex);
             return new OneInputTransformation<>(
                     inputTransform,
                     "SinkConversionTo" + resultDataType.getConversionClass().getSimpleName(),
@@ -195,4 +201,18 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> {
                     inputTransform.getParallelism());
         }
     }
+
+    private int getRowtimeIndex(RowType inputRowType) {
+        int rowtimeIndex = -1;
+        final List<Integer> rowtimeFieldIndices = new ArrayList<>();
+        for (int i = 0; i < inputRowType.getFieldCount(); ++i) {
+            if (TypeCheckUtils.isRowTime(inputRowType.getTypeAt(i))) {
+                rowtimeFieldIndices.add(i);
+            }
+        }
+        if (rowtimeFieldIndices.size() == 1) {
+            rowtimeIndex = rowtimeFieldIndices.get(0);
+        }
+        return rowtimeIndex;
+    }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index a846592..8e3a9ed 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -256,4 +256,7 @@ object OperatorCodeGenerator extends Logging {
 
   def generateCollect(emit: String): String =
     s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit));"
+
+  def generateCollectWithTimestamp(emit: String, timestampTerm: String): String =
+    s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit, $timestampTerm));"
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index 2d135da..c473342 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.data.util.RowDataUtil
 import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternalConverterWithLegacy
 import org.apache.flink.table.planner.codegen.GeneratedExpression.NO_CODE
-import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
+import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, generateCollectWithTimestamp}
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -68,10 +68,12 @@ object SinkCodeGenerator {
       sink: TableSink[_],
       physicalOutputType: DataType,
       withChangeFlag: Boolean,
-      operatorName: String): CodeGenOperatorFactory[OUT] = {
+      operatorName: String,
+      rowtimeIndex: Int = -1): CodeGenOperatorFactory[OUT] = {
     val physicalTypeInfo = fromDataTypeToTypeInfo(physicalOutputType)
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     var afterIndexModify = inputTerm
+    var modifiedRowtimeIndex = rowtimeIndex
     val fieldIndexProcessCode = physicalTypeInfo match {
       case pojo: PojoTypeInfo[_] =>
         val mapping = pojo.getFieldNames.map { name =>
@@ -91,6 +93,10 @@ object SinkCodeGenerator {
           (0 until pojo.getArity)
             .map(pojo.getTypeAt)
             .map(fromTypeInfoToLogicalType): _*)
+        if (rowtimeIndex >= 0) {
+          modifiedRowtimeIndex = outputRowType.getFieldIndex(
+            inputRowType.getFieldNames.get(rowtimeIndex))
+        }
         val conversion = resultGenerator.generateConverterResultExpression(
           outputRowType,
           classOf[GenericRowData])
@@ -118,7 +124,7 @@ object SinkCodeGenerator {
            |$tupleClass $resultTerm = new $tupleClass();
            |$resultTerm.setField($flagResultTerm, 0);
            |$resultTerm.setField($outTerm, 1);
-           |${generateCollect(resultTerm)}
+           |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
          """.stripMargin
       } else {
         // Scala Case Class
@@ -137,11 +143,11 @@ object SinkCodeGenerator {
            |$fieldsTerm[0] = $flagResultTerm;
            |$fieldsTerm[1] = $outTerm;
            |$tupleClass $resultTerm = ($tupleClass) $serializerTerm.createInstance($fieldsTerm);
-           |${generateCollect(resultTerm)}
+           |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)}
          """.stripMargin
       }
     } else {
-      generateCollect(outTerm)
+      generateCollectCode(afterIndexModify, outTerm, modifiedRowtimeIndex)
     }
 
     val generated = OperatorCodeGenerator.generateOneInputStreamOperator[RowData, OUT](
@@ -154,4 +160,20 @@ object SinkCodeGenerator {
       inputRowType)
     new CodeGenOperatorFactory[OUT](generated)
   }
+
+  private def generateCollectCode(
+      afterIndexModify: String,
+      resultTerm: String,
+      modifiedRowtimeIndex: Int): String = {
+    if (modifiedRowtimeIndex >= 0) {
+      val rowtimeTerm = CodeGenUtils.newName("rowtime")
+      s"""
+         | Long $rowtimeTerm =
+         | $afterIndexModify.getTimestamp($modifiedRowtimeIndex, 3).getMillisecond();
+         | ${generateCollectWithTimestamp(resultTerm, rowtimeTerm)}
+          """.stripMargin
+    } else {
+      generateCollect(resultTerm)
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index a899456..e2fa85c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import org.apache.flink.table.planner.factories.TestValuesTableFactory.{TestSinkContextTableSink, changelogRow}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData.{data1, nullData4, smallTupleData3, tupleData3, tupleData5}
 import org.apache.flink.table.utils.LegacyRowResource
@@ -33,7 +33,6 @@ import org.junit.{Rule, Test}
 
 import java.lang.{Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
-import java.time.{LocalDateTime, ZoneOffset}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
@@ -629,89 +628,6 @@ class TableSinkITCase extends StreamingTestBase {
   }
 
   @Test
-  def testSinkContext(): Unit = {
-    val data = List(
-      rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
-      rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
-      rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
-      rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
-      rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
-      rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
-      rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
-
-    val dataId: String = TestValuesTableFactory.registerData(data)
-
-    val sourceDDL =
-      s"""
-         |CREATE TABLE src (
-         |  log_ts STRING,
-         |  ts TIMESTAMP(3),
-         |  a INT,
-         |  b DOUBLE,
-         |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
-         |) WITH (
-         |  'connector' = 'values',
-         |  'data-id' = '$dataId'
-         |)
-      """.stripMargin
-
-    val sinkDDL =
-      s"""
-         |CREATE TABLE sink (
-         |  log_ts STRING,
-         |  ts TIMESTAMP(3),
-         |  a INT,
-         |  b DOUBLE
-         |) WITH (
-         |  'connector' = 'values',
-         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
-         |)
-      """.stripMargin
-
-    tEnv.executeSql(sourceDDL)
-    tEnv.executeSql(sinkDDL)
-
-    //---------------------------------------------------------------------------------------
-    // Verify writing out a source directly with the rowtime attribute
-    //---------------------------------------------------------------------------------------
-
-    tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await()
-
-    val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
-    assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
-
-    val sinkDDL2 =
-      s"""
-         |CREATE TABLE sink2 (
-         |  window_rowtime TIMESTAMP(3),
-         |  b DOUBLE
-         |) WITH (
-         |  'connector' = 'values',
-         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
-         |)
-      """.stripMargin
-    tEnv.executeSql(sinkDDL2)
-
-    //---------------------------------------------------------------------------------------
-    // Verify writing out with additional operator to generate a new rowtime attribute
-    //---------------------------------------------------------------------------------------
-
-    tEnv.executeSql(
-      """
-        |INSERT INTO sink2
-        |SELECT
-        |  TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
-        |  SUM(b)
-        |FROM src
-        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
-        |""".stripMargin
-    ).await()
-
-    val expected2 = List(4999, 9999, 19999)
-    assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
-  }
-
-  @Test
   def testMetadataSourceAndSink(): Unit = {
     val dataId = TestValuesTableFactory.registerData(nullData4)
     // tests metadata at different locations and casting in both sources and sinks
@@ -954,10 +870,4 @@ class TableSinkITCase extends StreamingTestBase {
          |""".stripMargin)
     tEnv.executeSql(s"INSERT INTO $sinkTableName SELECT * FROM $sourceTableName").await()
   }
-
-  // ------------------------------------------------------------------------------------------
-
-  private def localDateTime(epochSecond: Long): LocalDateTime = {
-    LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
new file mode 100644
index 0000000..4339774
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table
+
+import java.time.{LocalDateTime, ZoneOffset}
+import java.util.TimeZone
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.TestSinkContextTableSink
+import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestSinkUtil, TestingRetractSink}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+/**
+ * Test the conversion between [[Table]] and [[DataStream]] should
+ * not loss row time attribute.
+ */
+final class TableToDataStreamITCase extends StreamingTestBase {
+
+  @Test
+  def testHasRowtimeFromTableToAppendStream(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "B"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery("SELECT a, ts FROM src").toAppendStream[Row]
+
+    val expected = List(
+      "+I[A, 1970-01-01T00:00:01], 1000",
+      "+I[B, 1970-01-01T00:00:02], 2000",
+      "+I[C, 1970-01-01T00:00:03], 3000",
+      "+I[D, 1970-01-01T00:00:04], 4000",
+      "+I[E, 1970-01-01T00:00:07], 7000")
+
+    val sink = new StringWithTimestampSink[Row]
+    dataStream.addSink(sink)
+    env.execute("TableToAppendStream")
+    assertEquals(expected, sink.getResults.sorted)
+
+  }
+
+  @Test
+  def testHasRowtimeFromTableToRetractStream(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "A"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery(
+      """
+        |SELECT a, ts
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY ts DESC) as rowNum
+        |  FROM src
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    ).toRetractStream[Row]
+
+    val sink = new StringWithTimestampRetractSink[Row]
+    dataStream.addSink(sink)
+    env.execute("TableToRetractStream")
+
+    val expected = List(
+      "A,1970-01-01T00:00:02,2000",
+      "C,1970-01-01T00:00:03,3000",
+      "D,1970-01-01T00:00:04,4000",
+      "E,1970-01-01T00:00:07,7000")
+    assertEquals(expected, sink.getRetractResults.sorted)
+
+    val expectedRetract = List(
+      "(true,A,1970-01-01T00:00:01,1000)",
+      "(false,A,1970-01-01T00:00:01,1000)",
+      "(true,A,1970-01-01T00:00:02,2000)",
+      "(true,C,1970-01-01T00:00:03,3000)",
+      "(true,D,1970-01-01T00:00:04,4000)",
+      "(true,E,1970-01-01T00:00:07,7000)")
+    assertEquals(expectedRetract.sorted, sink.getRawResults.sorted)
+  }
+
+  @Test
+  def testHasRowtimeFromDataStreamToTableBackDataStream(): Unit = {
+    val data = Seq(
+      (1L, "A"),
+      (2L, "B"),
+      (3L, "C"),
+      (4L, "D"),
+      (7L, "E"))
+
+    val ds1 = env.fromCollection(data)
+      // second to millisecond
+      .assignAscendingTimestamps(_._1 * 1000L)
+    val table = ds1.toTable(tEnv, 'ts, 'a, 'rowtime.rowtime)
+    tEnv.registerTable("t1", table)
+
+    val ds2 = tEnv.sqlQuery(
+      """
+        | SELECT CONCAT(a, '_'), ts, rowtime
+        | FROM t1
+      """.stripMargin
+    ).toAppendStream[Row]
+
+    val expected = List(
+      "+I[A_, 1, 1970-01-01T00:00:01], 1000",
+      "+I[B_, 2, 1970-01-01T00:00:02], 2000",
+      "+I[C_, 3, 1970-01-01T00:00:03], 3000",
+      "+I[D_, 4, 1970-01-01T00:00:04], 4000",
+      "+I[E_, 7, 1970-01-01T00:00:07], 7000")
+
+    val sink = new StringWithTimestampSink[Row]
+    ds2.addSink(sink)
+    env.execute("DataStreamToTableBackDataStream")
+    assertEquals(expected, sink.getResults.sorted)
+  }
+
+  @Test
+  def testHasRowtimeFromTableToExternalSystem(): Unit = {
+    val data = List(
+      rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d),
+      rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d),
+      rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d),
+      rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d),
+      rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d),
+      rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d),
+      rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  log_ts STRING,
+         |  ts TIMESTAMP(3),
+         |  a INT,
+         |  b DOUBLE,
+         |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    val sinkDDL =
+      s"""
+         |CREATE TABLE sink (
+         |  log_ts STRING,
+         |  ts TIMESTAMP(3),
+         |  a INT,
+         |  b DOUBLE
+         |) WITH (
+         |  'connector' = 'values',
+         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    tEnv.executeSql(sinkDDL)
+
+    //---------------------------------------------------------------------------------------
+    // Verify writing out a source directly with the rowtime attribute
+    //---------------------------------------------------------------------------------------
+
+    tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await()
+
+    val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000)
+    assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+
+    val sinkDDL2 =
+      s"""
+         |CREATE TABLE sink2 (
+         |  window_rowtime TIMESTAMP(3),
+         |  b DOUBLE
+         |) WITH (
+         |  'connector' = 'values',
+         |  'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}'
+         |)
+      """.stripMargin
+    tEnv.executeSql(sinkDDL2)
+
+    //---------------------------------------------------------------------------------------
+    // Verify writing out with additional operator to generate a new rowtime attribute
+    //---------------------------------------------------------------------------------------
+
+    tEnv.executeSql(
+      """
+        |INSERT INTO sink2
+        |SELECT
+        |  TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),
+        |  SUM(b)
+        |FROM src
+        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
+        |""".stripMargin
+    ).await()
+
+    val expected2 = List(4999, 9999, 19999)
+    assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
+  }
+
+  private def localDateTime(epochSecond: Long): LocalDateTime = {
+    LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+  }
+}
+
+/**
+ * Append test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampSink[T] extends AbstractExactlyOnceSink[T]() {
+
+  override def invoke(value: T, context: SinkFunction.Context): Unit = {
+    localResults += s"${value.toString}, ${context.timestamp()}"
+  }
+
+  override def getResults: List[String] = super.getResults
+}
+
+/**
+ * Retract test Sink that outputs record with timestamp.
+ */
+final class StringWithTimestampRetractSink[T](tz: TimeZone) extends
+  TestingRetractSink(tz) {
+
+  def this() {
+    this(TimeZone.getTimeZone("UTC"))
+  }
+
+  override def invoke(v: (Boolean, Row), context: SinkFunction.Context): Unit = {
+    this.synchronized {
+      val rowString = s"${TestSinkUtil.rowToString(v._2, tz)},${context.timestamp()}"
+
+      val tupleString = "(" + v._1.toString + "," + rowString + ")"
+      localResults += tupleString
+      if (v._1) {
+        localRetractResults += rowString
+      } else {
+        val index = localRetractResults.indexOf(rowString)
+        if (index >= 0) {
+          localRetractResults.remove(index)
+        } else {
+          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
+            "This is probably an incorrectly implemented test. " +
+            "Try to set the parallelism of the sink to 1.")
+        }
+      }
+    }
+  }
+
+  override def getResults: List[String] = super.getResults
+}