You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2023/01/03 09:50:14 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer commented on issue #3271: [Umbrella][Feature][Connector-V2] Support create table automaticly

TyrantLucifer commented on issue #3271:
URL: https://github.com/apache/incubator-seatunnel/issues/3271#issuecomment-1369569602

   ## Overview
   
   As we know, the ability to automate table before writing out data is important to many users. Mapping of types is often required for automatic table creation, fortunately, this is one of our strengths, so I proposal this feature in community.
   
   ## Design
   
   In sink connector, The data type is obtained after the `SeaTunnelSink#setTypeInfo`, so we can add a new life cycle in `SeaTunnelSink` to do this thing.
   
   ```java
       default void createTable(SeaTunnelRowType seaTunnelRowType) {
           // do nothing
       }
   ```
   Override and implement this method yourself for different connectors and execute it in starter module:
   
   ```java
       public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
           DataStream<Row> input = upstreamDataStreams.get(0);
           for (int i = 0; i < plugins.size(); i++) {
               Config sinkConfig = pluginConfigs.get(i);
               SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
               DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
               seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
               seaTunnelSink.createTable((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
               DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
               if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                   int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                   dataStreamSink.setParallelism(parallelism);
               }
           }
           // the sink is the last stream
           return null;
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org