You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2023/01/03 09:50:14 UTC
[GitHub] [incubator-seatunnel] TyrantLucifer commented on issue #3271: [Umbrella][Feature][Connector-V2] Support create table automaticly
TyrantLucifer commented on issue #3271:
URL: https://github.com/apache/incubator-seatunnel/issues/3271#issuecomment-1369569602
## Overview
As we know, the ability to automate table before writing out data is important to many users. Mapping of types is often required for automatic table creation, fortunately, this is one of our strengths, so I proposal this feature in community.
## Design
In sink connector, The data type is obtained after the `SeaTunnelSink#setTypeInfo`, so we can add a new life cycle in `SeaTunnelSink` to do this thing.
```java
default void createTable(SeaTunnelRowType seaTunnelRowType) {
// do nothing
}
```
Override and implement this method yourself for different connectors and execute it in starter module:
```java
public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
DataStream<Row> input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
seaTunnelSink.createTable((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
// the sink is the last stream
return null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org