You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/19 08:59:01 UTC

[flink] 01/03: [FLINK-24608][table-planner][table-runtime] Insert rowtime into StreamRecord for SinkProviders

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 548b96e9cb226aaf8d919d900c9326520a5b6dc8
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Wed Nov 10 18:02:45 2021 +0100

    [FLINK-24608][table-planner][table-runtime] Insert rowtime into StreamRecord for SinkProviders
    
    Previously, sinks built using the unified sink framework were missing the `timestamp`
    from the `StreamRecord` when used with `TableAPI`. Introduce a new Operator which sets
    the timestamp to each `StreamRecord` from the corresponding field of each row.
    
    This closes #17759.
---
 .../plan/nodes/exec/common/CommonExecSink.java     |  22 +-
 .../plan/nodes/exec/stream/StreamExecMatch.java    |  13 +-
 .../nodes/exec/common/CommonExecSinkITCase.java    | 278 +++++++++++++++++++++
 .../operators/match/RowtimeProcessFunction.java    |  60 -----
 .../sink/StreamRecordTimestampInserter.java        |  54 ++++
 5 files changed, 357 insertions(+), 70 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 8f4b12a8..05145d0f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -61,6 +61,7 @@ import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
 import org.apache.flink.table.runtime.operators.sink.SinkOperator;
 import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
+import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
@@ -301,7 +302,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
             int rowtimeFieldIndex,
             int sinkParallelism) {
         if (runtimeProvider instanceof DataStreamSinkProvider) {
-            final DataStream<RowData> dataStream = new DataStream<>(env, inputTransform);
+            Transformation<RowData> sinkTransformation =
+                    applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism);
+            final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
             final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
             return provider.consumeDataStream(dataStream).getTransformation();
         } else if (runtimeProvider instanceof TransformationSinkProvider) {
@@ -322,7 +325,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
                     sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism);
         } else if (runtimeProvider instanceof SinkProvider) {
             return new SinkTransformation<>(
-                    inputTransform,
+                    applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism),
                     ((SinkProvider) runtimeProvider).createSink(),
                     getDescription(),
                     sinkParallelism);
@@ -351,6 +354,21 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
                 sinkParallelism);
     }
 
+    private Transformation<RowData> applyRowtimeTransformation(
+            Transformation<RowData> inputTransform, int rowtimeFieldIndex, int sinkParallelism) {
+        // Don't apply the transformation/operator if there is no rowtimeFieldIndex
+        if (rowtimeFieldIndex == -1) {
+            return inputTransform;
+        }
+        return new OneInputTransformation<>(
+                inputTransform,
+                String.format(
+                        "StreamRecordTimestampInserter(rowtime field: %s)", rowtimeFieldIndex),
+                new StreamRecordTimestampInserter(rowtimeFieldIndex),
+                inputTransform.getOutputType(),
+                sinkParallelism);
+    }
+
     private InternalTypeInfo<RowData> getInputTypeInfo() {
         return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 74e0e14..e246c7a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -30,7 +30,6 @@ import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.table.api.TableConfig;
@@ -55,7 +54,7 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
 import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
-import org.apache.flink.table.runtime.operators.match.RowtimeProcessFunction;
+import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -259,12 +258,10 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
             Transformation<RowData> transform =
                     new OneInputTransformation<>(
                             inputTransform,
-                            String.format("rowtime field: (%s)", timeOrderFieldIdx),
-                            new ProcessOperator<>(
-                                    new RowtimeProcessFunction(
-                                            timeOrderFieldIdx,
-                                            inputTransform.getOutputType(),
-                                            precision)),
+                            String.format(
+                                    "StreamRecordTimestampInserter(rowtime field: %s)",
+                                    timeOrderFieldIdx),
+                            new StreamRecordTimestampInserter(timeOrderFieldIdx, precision),
                             inputTransform.getOutputType(),
                             inputTransform.getParallelism());
             if (inputsContainSingleton()) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
new file mode 100644
index 0000000..d2cabbc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.runtime.operators.sink.TestSink;
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.types.Row;
+
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link CommonExecSink}. */
+public class CommonExecSinkITCase extends AbstractTestBase {
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void before() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+    }
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    @Test
+    public void testStreamRecordTimestampInserterSinkRuntimeProvider()
+            throws ExecutionException, InterruptedException {
+        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+        final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "foo", Instant.parse("2020-11-10T12:34:56.123Z")),
+                        Row.of(2, "foo", Instant.parse("2020-11-10T11:34:56.789Z")),
+                        Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")),
+                        Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaStreamRecordTimestampInserter(true))
+                        .source(new TimestampTestSource(rows))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public DynamicTableSink.SinkRuntimeProvider
+                                            getSinkRuntimeProvider(
+                                                    DynamicTableSink.Context context) {
+                                        return SinkProvider.of(
+                                                TestSink.newBuilder()
+                                                        .setWriter(new TestWriter(timestamps))
+                                                        .setCommittableSerializer(
+                                                                TestSink.StringCommittableSerializer
+                                                                        .INSTANCE)
+                                                        .build());
+                                    }
+                                })
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
+        assertPlan(tableEnv, sqlStmt, true);
+        tableEnv.executeSql(sqlStmt).await();
+        assertTimestampResults(timestamps, rows);
+    }
+
+    @Test
+    public void testStreamRecordTimestampInserterDataStreamSinkProvider()
+            throws ExecutionException, InterruptedException {
+        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+        final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")),
+                        Row.of(2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")),
+                        Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")),
+                        Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaStreamRecordTimestampInserter(true))
+                        .source(new TimestampTestSource(rows))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public DataStreamSinkProvider getSinkRuntimeProvider(
+                                            DynamicTableSink.Context context) {
+                                        return dataStream ->
+                                                dataStream.addSink(
+                                                        new SinkFunction<RowData>() {
+                                                            @Override
+                                                            public void invoke(
+                                                                    RowData value,
+                                                                    Context context) {
+                                                                addTimestamp(
+                                                                        timestamps,
+                                                                        context.timestamp());
+                                                            }
+                                                        });
+                                    }
+                                })
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
+        assertPlan(tableEnv, sqlStmt, true);
+        tableEnv.executeSql(sqlStmt).await();
+        Collections.sort(timestamps.get());
+        assertTimestampResults(timestamps, rows);
+    }
+
+    @Test
+    public void testStreamRecordTimestampInserterNotApplied() {
+        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+        final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")),
+                        Row.of(2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")),
+                        Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")),
+                        Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaStreamRecordTimestampInserter(false))
+                        .source(new TimestampTestSource(rows))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public DynamicTableSink.SinkRuntimeProvider
+                                            getSinkRuntimeProvider(
+                                                    DynamicTableSink.Context context) {
+                                        return SinkProvider.of(
+                                                TestSink.newBuilder()
+                                                        .setWriter(new TestWriter(timestamps))
+                                                        .setCommittableSerializer(
+                                                                TestSink.StringCommittableSerializer
+                                                                        .INSTANCE)
+                                                        .build());
+                                    }
+                                })
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+        assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
+    }
+
+    private static void addTimestamp(SharedReference<List<Long>> timestamps, Long timestamp) {
+        timestamps.applySync(l -> l.add(timestamp));
+    }
+
+    private static void assertPlan(
+            StreamTableEnvironment tableEnv,
+            String sql,
+            boolean containsStreamRecordTimestampInserter) {
+        Matcher<String> matcher = containsString("StreamRecordTimestampInserter(rowtime field: 2");
+        if (!containsStreamRecordTimestampInserter) {
+            matcher = not(matcher);
+        }
+        assertThat(tableEnv.explainSql(sql, ExplainDetail.JSON_EXECUTION_PLAN), matcher);
+    }
+
+    private static Schema schemaStreamRecordTimestampInserter(boolean withWatermark) {
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("a", "INT")
+                        .column("b", "STRING")
+                        .column("ts", "TIMESTAMP_LTZ(3)");
+        if (withWatermark) {
+            builder.watermark("ts", "ts");
+        }
+        return builder.build();
+    }
+
+    private static void assertTimestampResults(
+            SharedReference<List<Long>> timestamps, List<Row> rows) {
+        assertEquals(rows.size(), timestamps.get().size());
+        for (int i = 0; i < rows.size(); i++) {
+            assertEquals(rows.get(i).getField(2), Instant.ofEpochMilli(timestamps.get().get(i)));
+        }
+    }
+
+    private static class TimestampTestSource extends TableFactoryHarness.ScanSourceBase {
+
+        private final List<Row> rows;
+
+        private TimestampTestSource(List<Row> rows) {
+            super(false);
+            this.rows = rows;
+        }
+
+        @Override
+        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(
+                ScanTableSource.ScanContext context) {
+            final DynamicTableSource.DataStructureConverter converter =
+                    context.createDataStructureConverter(
+                            getFactoryContext().getPhysicalRowDataType());
+
+            return SourceFunctionProvider.of(new TestSource(rows, converter), true);
+        }
+    }
+
+    private static class TestSource implements SourceFunction<RowData> {
+
+        private final List<Row> rows;
+        private final DynamicTableSource.DataStructureConverter converter;
+
+        public TestSource(List<Row> rows, DynamicTableSource.DataStructureConverter converter) {
+            this.rows = rows;
+            this.converter = converter;
+        }
+
+        @Override
+        public void run(SourceContext<RowData> ctx) throws Exception {
+            rows.stream().map(row -> (RowData) converter.toInternal(row)).forEach(ctx::collect);
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    private static class TestWriter extends TestSink.DefaultSinkWriter<RowData> {
+
+        private final SharedReference<List<Long>> timestamps;
+
+        private TestWriter(SharedReference<List<Long>> timestamps) {
+            this.timestamps = timestamps;
+        }
+
+        @Override
+        public void write(RowData element, Context context) {
+            addTimestamp(timestamps, context.timestamp());
+            super.write(element, context);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
deleted file mode 100644
index e653798..0000000
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.match;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.Collector;
-
-/**
- * ProcessFunction to copy a timestamp from a {@link RowData} field into the {@link
- * org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
- */
-public class RowtimeProcessFunction extends ProcessFunction<RowData, RowData>
-        implements ResultTypeQueryable<RowData> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int rowtimeIdx;
-    private final int precision;
-    private transient TypeInformation<RowData> returnType;
-
-    public RowtimeProcessFunction(
-            int rowtimeIdx, TypeInformation<RowData> returnType, int precision) {
-        this.rowtimeIdx = rowtimeIdx;
-        this.returnType = returnType;
-        this.precision = precision;
-    }
-
-    @Override
-    public void processElement(RowData value, Context ctx, Collector<RowData> out)
-            throws Exception {
-        long timestamp = value.getTimestamp(rowtimeIdx, precision).getMillisecond();
-        ((TimestampedCollector<RowData>) out).setAbsoluteTimestamp(timestamp);
-        out.collect(value);
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-        return returnType;
-    }
-}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java
new file mode 100644
index 0000000..c9e18eb
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+/**
+ * Operator which sets the timestamp on the StreamRecord from the corresponding column of each row.
+ */
+@Internal
+public class StreamRecordTimestampInserter extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private final int rowtimeIndex;
+    private final int precision;
+
+    public StreamRecordTimestampInserter(int rowtimeIndex, int precision) {
+        this.rowtimeIndex = rowtimeIndex;
+        this.precision = precision;
+    }
+
+    public StreamRecordTimestampInserter(int rowtimeIndex) {
+        this(rowtimeIndex, 3);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        final RowData rowData = element.getValue();
+        // timestamp might be TIMESTAMP or TIMESTAMP_LTZ
+        final long rowtime = rowData.getTimestamp(rowtimeIndex, precision).getMillisecond();
+        element.setTimestamp(rowtime);
+        output.collect(element);
+    }
+}