You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2021/12/01 08:05:24 UTC
[flink] 01/03: [FLINK-24596][table] Allow using unified Sinks with the DataStreamSinkProvider
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b7cd97b02d7f39b5190d27fbaf6e6287f127b9a9
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Thu Oct 21 10:20:25 2021 +0200
[FLINK-24596][table] Allow using unified Sinks with the DataStreamSinkProvider
---
.../connectors/cassandra/CassandraSink.java | 2 +-
.../flink/streaming/api/datastream/DataStream.java | 2 +-
.../streaming/api/datastream/DataStreamSink.java | 8 ++-
.../streaming/api/datastream/KeyedStream.java | 4 +-
.../api/operators/collect/CollectStreamSink.java | 8 ++-
.../transformations/LegacySinkTransformation.java | 5 +-
.../LegacySinkTransformationTranslator.java | 2 +-
.../api/datastream/DataStreamSinkTest.java | 10 ++-
.../plan/nodes/exec/batch/BatchExecLegacySink.java | 3 +-
.../nodes/exec/common/CommonExecLegacySink.java | 18 +++---
.../nodes/exec/stream/StreamExecLegacySink.java | 3 +-
.../nodes/exec/common/CommonExecSinkITCase.java | 75 +++++++++++++++++++---
12 files changed, 105 insertions(+), 35 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 89aa10a..bf44f0a 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -62,7 +62,7 @@ public class CassandraSink<IN> {
}
private LegacySinkTransformation<IN> getSinkTransformation() {
- return sink1.getTransformation();
+ return sink1.getLegacyTransformation();
}
private Transformation<IN> getTransformation() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 726728b..1c1bca6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1244,7 +1244,7 @@ public class DataStream<T> {
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
- getExecutionEnvironment().addOperator(sink.getTransformation());
+ getExecutionEnvironment().addOperator(sink.getLegacyTransformation());
return sink;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index eea988e..3aca069 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -64,7 +65,12 @@ public class DataStreamSink<T> {
/** Returns the transformation that contains the actual sink operator of this sink. */
@Internal
- public LegacySinkTransformation<T> getTransformation() {
+ public Transformation<T> getTransformation() {
+ return transformation;
+ }
+
+ @Internal
+ public LegacySinkTransformation<T> getLegacyTransformation() {
if (transformation instanceof LegacySinkTransformation) {
return (LegacySinkTransformation<T>) transformation;
} else {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index c299a68..ada3a82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -299,8 +299,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
@Override
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
DataStreamSink<T> result = super.addSink(sinkFunction);
- result.getTransformation().setStateKeySelector(keySelector);
- result.getTransformation().setStateKeyType(keyType);
+ result.getLegacyTransformation().setStateKeySelector(keySelector);
+ result.getLegacyTransformation().setStateKeyType(keyType);
return result;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java
index d53c3e6..a68043d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java
@@ -18,10 +18,12 @@
package org.apache.flink.streaming.api.operators.collect;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
/**
* A {@link DataStreamSink} which is used to collect results of a data stream. It completely
@@ -30,17 +32,17 @@ import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@Internal
public class CollectStreamSink<T> extends DataStreamSink<T> {
- private final LegacySinkTransformation<T> transformation;
+ private final PhysicalTransformation<T> transformation;
public CollectStreamSink(DataStream<T> inputStream, CollectSinkOperatorFactory<T> factory) {
super(inputStream, (CollectSinkOperator<T>) factory.getOperator());
this.transformation =
- new LegacySinkTransformation<>(
+ new LegacySinkTransformation<T>(
inputStream.getTransformation(), "Collect Stream Sink", factory, 1);
}
@Override
- public LegacySinkTransformation<T> getTransformation() {
+ public Transformation<T> getTransformation() {
return transformation;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
index d240329..8977c70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -40,7 +39,7 @@ import java.util.List;
* @param <T> The type of the elements in the input {@code LegacySinkTransformation}
*/
@Internal
-public class LegacySinkTransformation<T> extends PhysicalTransformation<Object> {
+public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {
private final Transformation<T> input;
@@ -70,7 +69,7 @@ public class LegacySinkTransformation<T> extends PhysicalTransformation<Object>
String name,
StreamOperatorFactory<Object> operatorFactory,
int parallelism) {
- super(name, TypeExtractor.getForClass(Object.class), parallelism);
+ super(name, input.getOutputType(), parallelism);
this.input = input;
this.operatorFactory = operatorFactory;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
index fa2b1d3..378a717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
@Internal
public class LegacySinkTransformationTranslator<IN>
- extends SimpleTransformationTranslator<Object, LegacySinkTransformation<IN>> {
+ extends SimpleTransformationTranslator<IN, LegacySinkTransformation<IN>> {
@Override
protected Collection<Integer> translateForBatchInternal(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
index b8d39ad..b2de771 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
@@ -17,18 +17,24 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
/** Unit test for {@link DataStreamSink}. */
public class DataStreamSinkTest {
- @Test(expected = IllegalStateException.class)
+ @Test
public void throwExceptionWhenGettingTransformationWithNewSinkAPI() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build()).getTransformation();
+ final Transformation<Integer> transformation =
+ env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build()).getTransformation();
+ assertTrue(transformation instanceof SinkTransformation);
}
@Test(expected = UnsupportedOperationException.class)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java
index 5aa1dea..049920c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java
@@ -37,8 +37,7 @@ import java.lang.reflect.Modifier;
*
* @param <T> The return type of the {@link TableSink}.
*/
-public class BatchExecLegacySink<T> extends CommonExecLegacySink<T>
- implements BatchExecNode<Object> {
+public class BatchExecLegacySink<T> extends CommonExecLegacySink<T> implements BatchExecNode<T> {
public BatchExecLegacySink(
TableSink<T> tableSink,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
index 8cb2b66..ca29944 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
@@ -58,8 +58,8 @@ import java.util.List;
*
* @param <T> The return type of the {@link TableSink}.
*/
-public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
- implements MultipleTransformationTranslator<Object> {
+public abstract class CommonExecLegacySink<T> extends ExecNodeBase<T>
+ implements MultipleTransformationTranslator<T> {
protected final TableSink<T> tableSink;
protected final @Nullable String[] upsertKeys;
protected final boolean needRetraction;
@@ -82,7 +82,7 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
@SuppressWarnings("unchecked")
@Override
- protected Transformation<Object> translateToPlanInternal(PlannerBase planner) {
+ protected Transformation<T> translateToPlanInternal(PlannerBase planner) {
if (tableSink instanceof StreamTableSink) {
final Transformation<T> transform;
if (tableSink instanceof RetractStreamTableSink) {
@@ -121,8 +121,9 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
}
final DataStream<T> dataStream = new DataStream<T>(planner.getExecEnv(), transform);
- final DataStreamSink<?> dsSink =
- ((StreamTableSink<T>) tableSink).consumeDataStream(dataStream);
+ final DataStreamSink<T> dsSink =
+ (DataStreamSink<T>)
+ ((StreamTableSink<T>) tableSink).consumeDataStream(dataStream);
if (dsSink == null) {
throw new TableException(
String.format(
@@ -131,15 +132,14 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
+ "However, %s doesn't implement this method.",
tableSink.getClass().getCanonicalName()));
}
- return dsSink.getTransformation();
+ return dsSink.getLegacyTransformation();
} else if (tableSink instanceof DataStreamTableSink) {
// In case of table to DataStream through
// StreamTableEnvironment#toAppendStream/toRetractStream,
// we insert a DataStreamTableSink that wraps the given DataStream as a LogicalSink. It
// is no real table sink, so we just need translate its input to Transformation.
- return (Transformation<Object>)
- translateToTransformation(
- planner, ((DataStreamTableSink<T>) tableSink).withChangeFlag());
+ return translateToTransformation(
+ planner, ((DataStreamTableSink<T>) tableSink).withChangeFlag());
} else {
throw new TableException(
String.format(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java
index af9ee1d..b1e4ee6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java
@@ -40,8 +40,7 @@ import java.util.stream.Collectors;
*
* @param <T> The return type of the {@link TableSink}.
*/
-public class StreamExecLegacySink<T> extends CommonExecLegacySink<T>
- implements StreamExecNode<Object> {
+public class StreamExecLegacySink<T> extends CommonExecLegacySink<T> implements StreamExecNode<T> {
public StreamExecLegacySink(
TableSink<T> tableSink,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 744ced7..60c489b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -50,7 +50,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import static org.apache.flink.table.api.DataTypes.INT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
@@ -93,7 +95,8 @@ public class CommonExecSinkITCase extends AbstractTestBase {
DynamicTableSink.Context context) {
return SinkProvider.of(
TestSink.newBuilder()
- .setWriter(new TestWriter(timestamps))
+ .setWriter(
+ new TestTimestampWriter(timestamps))
.setCommittableSerializer(
TestSink.StringCommittableSerializer
.INSTANCE)
@@ -136,7 +139,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
public void invoke(
RowData value,
Context context) {
- addTimestamp(
+ addElement(
timestamps,
context.timestamp());
}
@@ -153,6 +156,35 @@ public class CommonExecSinkITCase extends AbstractTestBase {
}
@Test
+ public void testUnifiedSinksAreUsableWithDataStreamSinkProvider()
+ throws ExecutionException, InterruptedException {
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ final SharedReference<List<RowData>> fetched = sharedObjects.add(new ArrayList<>());
+ final List<Row> rows = Arrays.asList(Row.of(1), Row.of(2));
+
+ final TableDescriptor sourceDescriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(Schema.newBuilder().column("a", INT()).build())
+ .source(new TimestampTestSource(rows))
+ .sink(
+ new TableFactoryHarness.SinkBase() {
+ @Override
+ public DataStreamSinkProvider getSinkRuntimeProvider(
+ DynamicTableSink.Context context) {
+ return buildRecordTestSinkProvider(fetched);
+ }
+ })
+ .build();
+ tableEnv.createTable("T1", sourceDescriptor);
+ String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
+ tableEnv.executeSql(sqlStmt).await();
+ final List<Integer> fetchedRows =
+ fetched.get().stream().map(r -> r.getInt(0)).sorted().collect(Collectors.toList());
+ assertEquals(fetchedRows.get(0).intValue(), 1);
+ assertEquals(fetchedRows.get(1).intValue(), 2);
+ }
+
+ @Test
public void testStreamRecordTimestampInserterNotApplied() {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
@@ -175,7 +207,8 @@ public class CommonExecSinkITCase extends AbstractTestBase {
DynamicTableSink.Context context) {
return SinkProvider.of(
TestSink.newBuilder()
- .setWriter(new TestWriter(timestamps))
+ .setWriter(
+ new TestTimestampWriter(timestamps))
.setCommittableSerializer(
TestSink.StringCommittableSerializer
.INSTANCE)
@@ -187,8 +220,19 @@ public class CommonExecSinkITCase extends AbstractTestBase {
assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
}
- private static void addTimestamp(SharedReference<List<Long>> timestamps, Long timestamp) {
- timestamps.applySync(l -> l.add(timestamp));
+ private static DataStreamSinkProvider buildRecordTestSinkProvider(
+ SharedReference<List<RowData>> fetched) {
+ return dataStream ->
+ dataStream.sinkTo(
+ TestSink.newBuilder()
+ .setWriter(new RecordWriter(fetched))
+ .setCommittableSerializer(
+ TestSink.StringCommittableSerializer.INSTANCE)
+ .build());
+ }
+
+ private static <T> void addElement(SharedReference<List<T>> elements, T element) {
+ elements.applySync(l -> l.add(element));
}
private static void assertPlan(
@@ -264,17 +308,32 @@ public class CommonExecSinkITCase extends AbstractTestBase {
public void cancel() {}
}
- private static class TestWriter extends TestSink.DefaultSinkWriter<RowData> {
+ private static class TestTimestampWriter extends TestSink.DefaultSinkWriter<RowData> {
private final SharedReference<List<Long>> timestamps;
- private TestWriter(SharedReference<List<Long>> timestamps) {
+ private TestTimestampWriter(SharedReference<List<Long>> timestamps) {
this.timestamps = timestamps;
}
@Override
public void write(RowData element, Context context) {
- addTimestamp(timestamps, context.timestamp());
+ addElement(timestamps, context.timestamp());
+ super.write(element, context);
+ }
+ }
+
+ private static class RecordWriter extends TestSink.DefaultSinkWriter<RowData> {
+
+ private final SharedReference<List<RowData>> rows;
+
+ private RecordWriter(SharedReference<List<RowData>> rows) {
+ this.rows = rows;
+ }
+
+ @Override
+ public void write(RowData element, Context context) {
+ addElement(rows, element);
super.write(element, context);
}
}