You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/19 08:59:01 UTC
[flink] 01/03: [FLINK-24608][table-planner][table-runtime] Insert rowtime into StreamRecord for SinkProviders
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 548b96e9cb226aaf8d919d900c9326520a5b6dc8
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Wed Nov 10 18:02:45 2021 +0100
[FLINK-24608][table-planner][table-runtime] Insert rowtime into StreamRecord for SinkProviders
Previously, sinks built using the unified sink framework were missing the `timestamp`
from the `StreamRecord` when used with `TableAPI`. Introduce a new Operator which sets
the timestamp to each `StreamRecord` from the corresponding field of each row.
This closes #17759.
---
.../plan/nodes/exec/common/CommonExecSink.java | 22 +-
.../plan/nodes/exec/stream/StreamExecMatch.java | 13 +-
.../nodes/exec/common/CommonExecSinkITCase.java | 278 +++++++++++++++++++++
.../operators/match/RowtimeProcessFunction.java | 60 -----
.../sink/StreamRecordTimestampInserter.java | 54 ++++
5 files changed, 357 insertions(+), 70 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 8f4b12a8..05145d0f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -61,6 +61,7 @@ import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
+import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
@@ -301,7 +302,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
int rowtimeFieldIndex,
int sinkParallelism) {
if (runtimeProvider instanceof DataStreamSinkProvider) {
- final DataStream<RowData> dataStream = new DataStream<>(env, inputTransform);
+ Transformation<RowData> sinkTransformation =
+ applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism);
+ final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
return provider.consumeDataStream(dataStream).getTransformation();
} else if (runtimeProvider instanceof TransformationSinkProvider) {
@@ -322,7 +325,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism);
} else if (runtimeProvider instanceof SinkProvider) {
return new SinkTransformation<>(
- inputTransform,
+ applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism),
((SinkProvider) runtimeProvider).createSink(),
getDescription(),
sinkParallelism);
@@ -351,6 +354,21 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
sinkParallelism);
}
+ private Transformation<RowData> applyRowtimeTransformation(
+ Transformation<RowData> inputTransform, int rowtimeFieldIndex, int sinkParallelism) {
+ // Don't apply the transformation/operator if there is no rowtimeFieldIndex
+ if (rowtimeFieldIndex == -1) {
+ return inputTransform;
+ }
+ return new OneInputTransformation<>(
+ inputTransform,
+ String.format(
+ "StreamRecordTimestampInserter(rowtime field: %s)", rowtimeFieldIndex),
+ new StreamRecordTimestampInserter(rowtimeFieldIndex),
+ inputTransform.getOutputType(),
+ sinkParallelism);
+ }
+
private InternalTypeInfo<RowData> getInputTypeInfo() {
return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 74e0e14..e246c7a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -30,7 +30,6 @@ import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.TableConfig;
@@ -55,7 +54,7 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
-import org.apache.flink.table.runtime.operators.match.RowtimeProcessFunction;
+import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
@@ -259,12 +258,10 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
Transformation<RowData> transform =
new OneInputTransformation<>(
inputTransform,
- String.format("rowtime field: (%s)", timeOrderFieldIdx),
- new ProcessOperator<>(
- new RowtimeProcessFunction(
- timeOrderFieldIdx,
- inputTransform.getOutputType(),
- precision)),
+ String.format(
+ "StreamRecordTimestampInserter(rowtime field: %s)",
+ timeOrderFieldIdx),
+ new StreamRecordTimestampInserter(timeOrderFieldIdx, precision),
inputTransform.getOutputType(),
inputTransform.getParallelism());
if (inputsContainSingleton()) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
new file mode 100644
index 0000000..d2cabbc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.runtime.operators.sink.TestSink;
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.types.Row;
+
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link CommonExecSink}. */
+public class CommonExecSinkITCase extends AbstractTestBase {
+
+ private StreamExecutionEnvironment env;
+
+ @Before
+ public void before() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ }
+
+ @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+ @Test
+ public void testStreamRecordTimestampInserterSinkRuntimeProvider()
+ throws ExecutionException, InterruptedException {
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(1, "foo", Instant.parse("2020-11-10T12:34:56.123Z")),
+ Row.of(2, "foo", Instant.parse("2020-11-10T11:34:56.789Z")),
+ Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")),
+ Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")));
+
+ final TableDescriptor sourceDescriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(schemaStreamRecordTimestampInserter(true))
+ .source(new TimestampTestSource(rows))
+ .sink(
+ new TableFactoryHarness.SinkBase() {
+ @Override
+ public DynamicTableSink.SinkRuntimeProvider
+ getSinkRuntimeProvider(
+ DynamicTableSink.Context context) {
+ return SinkProvider.of(
+ TestSink.newBuilder()
+ .setWriter(new TestWriter(timestamps))
+ .setCommittableSerializer(
+ TestSink.StringCommittableSerializer
+ .INSTANCE)
+ .build());
+ }
+ })
+ .build();
+ tableEnv.createTable("T1", sourceDescriptor);
+ String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
+ assertPlan(tableEnv, sqlStmt, true);
+ tableEnv.executeSql(sqlStmt).await();
+ assertTimestampResults(timestamps, rows);
+ }
+
+ @Test
+ public void testStreamRecordTimestampInserterDataStreamSinkProvider()
+ throws ExecutionException, InterruptedException {
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")),
+ Row.of(2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")),
+ Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")),
+ Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")));
+
+ final TableDescriptor sourceDescriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(schemaStreamRecordTimestampInserter(true))
+ .source(new TimestampTestSource(rows))
+ .sink(
+ new TableFactoryHarness.SinkBase() {
+ @Override
+ public DataStreamSinkProvider getSinkRuntimeProvider(
+ DynamicTableSink.Context context) {
+ return dataStream ->
+ dataStream.addSink(
+ new SinkFunction<RowData>() {
+ @Override
+ public void invoke(
+ RowData value,
+ Context context) {
+ addTimestamp(
+ timestamps,
+ context.timestamp());
+ }
+ });
+ }
+ })
+ .build();
+ tableEnv.createTable("T1", sourceDescriptor);
+ String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
+ assertPlan(tableEnv, sqlStmt, true);
+ tableEnv.executeSql(sqlStmt).await();
+ Collections.sort(timestamps.get());
+ assertTimestampResults(timestamps, rows);
+ }
+
+ @Test
+ public void testStreamRecordTimestampInserterNotApplied() {
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")),
+ Row.of(2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")),
+ Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")),
+ Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")));
+
+ final TableDescriptor sourceDescriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(schemaStreamRecordTimestampInserter(false))
+ .source(new TimestampTestSource(rows))
+ .sink(
+ new TableFactoryHarness.SinkBase() {
+ @Override
+ public DynamicTableSink.SinkRuntimeProvider
+ getSinkRuntimeProvider(
+ DynamicTableSink.Context context) {
+ return SinkProvider.of(
+ TestSink.newBuilder()
+ .setWriter(new TestWriter(timestamps))
+ .setCommittableSerializer(
+ TestSink.StringCommittableSerializer
+ .INSTANCE)
+ .build());
+ }
+ })
+ .build();
+ tableEnv.createTable("T1", sourceDescriptor);
+ assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
+ }
+
+ private static void addTimestamp(SharedReference<List<Long>> timestamps, Long timestamp) {
+ timestamps.applySync(l -> l.add(timestamp));
+ }
+
+ private static void assertPlan(
+ StreamTableEnvironment tableEnv,
+ String sql,
+ boolean containsStreamRecordTimestampInserter) {
+ Matcher<String> matcher = containsString("StreamRecordTimestampInserter(rowtime field: 2");
+ if (!containsStreamRecordTimestampInserter) {
+ matcher = not(matcher);
+ }
+ assertThat(tableEnv.explainSql(sql, ExplainDetail.JSON_EXECUTION_PLAN), matcher);
+ }
+
+ private static Schema schemaStreamRecordTimestampInserter(boolean withWatermark) {
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("a", "INT")
+ .column("b", "STRING")
+ .column("ts", "TIMESTAMP_LTZ(3)");
+ if (withWatermark) {
+ builder.watermark("ts", "ts");
+ }
+ return builder.build();
+ }
+
+ private static void assertTimestampResults(
+ SharedReference<List<Long>> timestamps, List<Row> rows) {
+ assertEquals(rows.size(), timestamps.get().size());
+ for (int i = 0; i < rows.size(); i++) {
+ assertEquals(rows.get(i).getField(2), Instant.ofEpochMilli(timestamps.get().get(i)));
+ }
+ }
+
+ private static class TimestampTestSource extends TableFactoryHarness.ScanSourceBase {
+
+ private final List<Row> rows;
+
+ private TimestampTestSource(List<Row> rows) {
+ super(false);
+ this.rows = rows;
+ }
+
+ @Override
+ public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(
+ ScanTableSource.ScanContext context) {
+ final DynamicTableSource.DataStructureConverter converter =
+ context.createDataStructureConverter(
+ getFactoryContext().getPhysicalRowDataType());
+
+ return SourceFunctionProvider.of(new TestSource(rows, converter), true);
+ }
+ }
+
+ private static class TestSource implements SourceFunction<RowData> {
+
+ private final List<Row> rows;
+ private final DynamicTableSource.DataStructureConverter converter;
+
+ public TestSource(List<Row> rows, DynamicTableSource.DataStructureConverter converter) {
+ this.rows = rows;
+ this.converter = converter;
+ }
+
+ @Override
+ public void run(SourceContext<RowData> ctx) throws Exception {
+ rows.stream().map(row -> (RowData) converter.toInternal(row)).forEach(ctx::collect);
+ }
+
+ @Override
+ public void cancel() {}
+ }
+
+ private static class TestWriter extends TestSink.DefaultSinkWriter<RowData> {
+
+ private final SharedReference<List<Long>> timestamps;
+
+ private TestWriter(SharedReference<List<Long>> timestamps) {
+ this.timestamps = timestamps;
+ }
+
+ @Override
+ public void write(RowData element, Context context) {
+ addTimestamp(timestamps, context.timestamp());
+ super.write(element, context);
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
deleted file mode 100644
index e653798..0000000
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.match;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.Collector;
-
-/**
- * ProcessFunction to copy a timestamp from a {@link RowData} field into the {@link
- * org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
- */
-public class RowtimeProcessFunction extends ProcessFunction<RowData, RowData>
- implements ResultTypeQueryable<RowData> {
-
- private static final long serialVersionUID = 1L;
-
- private final int rowtimeIdx;
- private final int precision;
- private transient TypeInformation<RowData> returnType;
-
- public RowtimeProcessFunction(
- int rowtimeIdx, TypeInformation<RowData> returnType, int precision) {
- this.rowtimeIdx = rowtimeIdx;
- this.returnType = returnType;
- this.precision = precision;
- }
-
- @Override
- public void processElement(RowData value, Context ctx, Collector<RowData> out)
- throws Exception {
- long timestamp = value.getTimestamp(rowtimeIdx, precision).getMillisecond();
- ((TimestampedCollector<RowData>) out).setAbsoluteTimestamp(timestamp);
- out.collect(value);
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return returnType;
- }
-}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java
new file mode 100644
index 0000000..c9e18eb
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+/**
+ * Operator which sets the timestamp on the StreamRecord from the corresponding column of each row.
+ */
+@Internal
+public class StreamRecordTimestampInserter extends TableStreamOperator<RowData>
+ implements OneInputStreamOperator<RowData, RowData> {
+
+ private final int rowtimeIndex;
+ private final int precision;
+
+ public StreamRecordTimestampInserter(int rowtimeIndex, int precision) {
+ this.rowtimeIndex = rowtimeIndex;
+ this.precision = precision;
+ }
+
+ public StreamRecordTimestampInserter(int rowtimeIndex) {
+ this(rowtimeIndex, 3);
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception {
+ final RowData rowData = element.getValue();
+ // timestamp might be TIMESTAMP or TIMESTAMP_LTZ
+ final long rowtime = rowData.getTimestamp(rowtimeIndex, precision).getMillisecond();
+ element.setTimestamp(rowtime);
+ output.collect(element);
+ }
+}