You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "Hisoka-X (via GitHub)" <gi...@apache.org> on 2023/01/30 11:34:08 UTC

[GitHub] [incubator-seatunnel] Hisoka-X opened a new issue, #4005: [Feature][Module Name] Feature title

Hisoka-X opened a new issue, #4005:
URL: https://github.com/apache/incubator-seatunnel/issues/4005

   ### Search before asking
   
   - [X] I had searched in the [feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement.
   
   
   ### Description
   
   ## Status quo
   In order to realize the automatic table creation function in SaveMode, the Sink side needs to receive the type information of the data passed from the upstream, and then build its own table
   ## Question
   Taking Clickhouse as an example, if we want to automatically create a data table, before our program reads and writes data, the corresponding data table is generated according to the upstream data structure.
   The upstream type structure is:
   ```java
   id int,
   name String,
   age int
   ```
   The table creation statement we expect to be executed is:
   ```sql
   CREATE TABLE user (
        idUInt32,
        name String DEFAULT 'Unknown',
        age UInt32
   ) ENGINE = MergeTree()
   PRIMARY KEY id
   ```
   - Question 1:  
   How do we know that the primary key is id and not name?
   - Question 2:
   Even if we know that the primary key is id, how can we let the program know that the primary key is id, and then help us automatically create this table?
   - Question 3:
   What if I want to use ReplacingMergeTree instead of MergeTree?
   - Question 4:
   Where can I find out the default value?
   ## Solution
   It can be seen that if you want to get a complete and usable table creation statement, you can divide the information source into two parts. Information at the field level, and information at the table level.
   Field-level information includes the field name, field type, field default value, and whether the field is a primary key. The information at the table level includes the table name, the engine type of the table, and other characteristic identifications belonging to different data sources.
   So the current data structure has no way to meet our needs, we need to make some changes to the original data structure:
   ```java
   // The Code Segment
   public class SeaTunnelRowType implements CompositeType<SeaTunnelRow> {
        private static final long serialVersionUID = 2L;
   
        /**
         * The field name of the {@link SeaTunnelRow}.
         */
        private final String[] fieldNames;
        /**
         * The type of the field.
         */
        private final SeaTunnelDataType<?>[] fieldTypes;
   }
   ```
   becomes:
   ```java
   // The Code Segment
   public class SeaTunnelRowType implements CompositeType<SeaTunnelRow> {
        private static final long serialVersionUID = 2L;
   
        /**
         * The field name of the {@link SeaTunnelRow}.
         */
        private final String[] fieldNames;
        /**
         * The columns of the {@link SeaTunnelRow}.
         */
        private final SeaTunnelColumn<?>[] columns;
   }
   
   public class SeaTunnelColumn<T> implements Serializable {
   
        private SeaTunnelDataType<T> fieldType;
   
        private String fieldName;
   
        private boolean isPrimaryKey = false;
   
        private boolean isNullable = true;
   
        private T defaultValue;
   
   }
   
   ```
   Save the field information by adding the SeaTunnelColumn type, and then get the information in the Sink Connector. Now, our problems 2 and 4 are solved, and we will solve problem 1 next.
   Usually, we need our Source to define the SeaTunnelRowType. Now we need to define the internal SeaTunnelColumn information while defining the SeaTunnelRowType to adapt to read the structural information of the Source. If you need to modify the SeaTunnelColumn definition of the field in the data processing process, you can use the Transform of FieldMapper to achieve it (the Transform needs to be adapted and modified).
   Next, we use templates to solve problem 3.
   Since different sinks have different syntax for creating tables, we take SQL as an example. Similar ElasticSearch Index creation and Hbase Table creation can also be created through this method. The template is an option. If the user does not configure it, the default template will be used to create the table, reducing the complexity of the user's configuration.
   Add a template field in Sink's Config, such as:
   ```javascript
   sink {
        clickhouse {
            host = "clickhouse:8123"
            database = "default"
            table = "sink_table"
            username = "default"
            password = ""
           
            save_mode = "DROP_SCHEMA"
            save_mode_create_template = ```
                                            CREATE TABLE user (
                                                ${rowtype_fields}
                                            ) ENGINE = MergeTree()
                                            PRIMARY KEY ${rowtype_primary_key}
                                        ```
        }
   }
   ```
   In this way, our Clickhouse Sink Connector can fill `rowtype_fields` and `rowtype_primary_key` according to the obtained SeaTunnelRowType. It can be seen that the save_mode_create_template we provide is a set of standards, rather than filling in the logic of the framework layer.
   By default, we can choose not to fill in `save_mode_create_template`, when we need to modify the engine, or modify other creation specific values, we can choose to fill in `save_mode_create_template`, such as modifying the engine to ReplacingMergeTree:
   ```javascript
   sink {
        clickhouse {
            host = "clickhouse:8123"
            database = "default"
            table = "sink_table"
            username = "default"
            password = ""
           
            save_mode = "DROP_SCHEMA"
            save_mode_create_template = ```
                                            CREATE TABLE user (
                                                ${rowtype_fields}
                                            ) ENGINE = ReplacingMergeTree()
                                            PRIMARY KEY ${rowtype_primary_key}
                                        ```
        }
   }
   ```
   
   ## Disadvantages
   The current RowType analysis of Spark/Flink will convert the RowType processed by the engine analysis generation operator, and then generate a new SeaTunnelRowType, and the SeaTunnelRowType content will be lost. This part of the logic needs to be modified to skip the RowType generated by the engine. Directly use the SeaTunnelRowType generated inside SeaTunnel. In this case, Transform-V1 cannot be supported, but we will abandon V1 in the next version, which has no effect.
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on issue #4005: [Feature][SaveMode] Support Create Table Use Custom Column Type And Template

Posted by "ashulin (via GitHub)" <gi...@apache.org>.
ashulin commented on issue #4005:
URL: https://github.com/apache/incubator-seatunnel/issues/4005#issuecomment-1408491608

   IMO, you can use the existing `CatalogTable` and `TableSchema` to fulfill the requirements without changing `RowType`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on issue #4005: [Feature][SaveMode] Support Create Table Use Custom Column Type And Template

Posted by "ashulin (via GitHub)" <gi...@apache.org>.
ashulin commented on issue #4005:
URL: https://github.com/apache/incubator-seatunnel/issues/4005#issuecomment-1408518656

   We need to allow `SeaTunnelSchema` to create a `CatalogTable`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org