You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/21 09:04:58 UTC

[inlong] branch master updated: [INLONG-6174][Sort] MySql connector support meta data with debezium format (#6210)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new eb48b847f [INLONG-6174][Sort] MySql connector support meta data with debezium format  (#6210)
eb48b847f is described below

commit eb48b847f28d78e2d349730143912e9fedeca49d
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Oct 21 17:04:51 2022 +0800

    [INLONG-6174][Sort] MySql connector support meta data with debezium format  (#6210)
---
 .../org/apache/inlong/common/enums/MetaField.java  |  22 ++++
 .../org/apache/inlong/sort/protocol/Metadata.java  |   4 +
 .../protocol/node/extract/MySqlExtractNode.java    |  13 ++-
 .../sort/protocol/node/load/KafkaLoadNode.java     |  10 +-
 .../node/extract/MySqlExtractNodeTest.java         |   9 +-
 .../sort/protocol/node/load/KafkaLoadNodeTest.java |   3 +-
 .../cdc/mysql/table/MySqlReadableMetadata.java     | 124 ++++++++++++++++-----
 .../apache/inlong/sort/parser/AllMigrateTest.java  |   2 +-
 inlong-sort/sort-formats/format-json/pom.xml       |   4 +
 .../sort/formats/json/debezium/DebeziumJson.java   |  48 ++++++++
 10 files changed, 199 insertions(+), 40 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index 8563c2234..f88da9c4d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -61,14 +61,36 @@ public enum MetaField {
 
     /**
      * Represents a canal json of a record in database (in string format)
+     * @deprecated please use DATA_CANAL \ DATA_DEBEZIUM
      */
     DATA,
 
+    /**
+     * Represents a canal json of a record in database (in string format)
+     */
+    DATA_CANAL,
+
+    /**
+     * Represents a debezium json of a record in database (in string format)
+     */
+    DATA_DEBEZIUM,
+
     /**
      * Represents a canal json of a record in database (in bytes format)
+     * @deprecated please use DATA_BYTES_DEBEZIUM \ DATA_CANAL_BYTES
      */
     DATA_BYTES,
 
+    /**
+     * Represents a debezium json of a record in database (in bytes format)
+     */
+    DATA_BYTES_DEBEZIUM,
+
+    /**
+     * Represents a canal json of a record in database (in bytes format)
+     */
+    DATA_BYTES_CANAL,
+
     /**
      * The value of the field before update. Currently, it is used for MySQL database.
      */
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
index 496b852a2..f4822fb80 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
@@ -78,7 +78,9 @@ public interface Metadata {
             case TABLE_NAME:
             case DATABASE_NAME:
             case OP_TYPE:
+            case DATA_CANAL:
             case DATA:
+            case DATA_DEBEZIUM:
             case COLLECTION_NAME:
             case SCHEMA_NAME:
                 metadataType = "STRING";
@@ -106,6 +108,8 @@ public interface Metadata {
                 metadataType = "ARRAY<MAP<STRING, STRING>>";
                 break;
             case DATA_BYTES:
+            case DATA_BYTES_DEBEZIUM:
+            case DATA_BYTES_CANAL:
                 metadataType = "BYTES";
                 break;
             default:
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index e561610b8..f674ef8c3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -280,7 +280,13 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
                 break;
             case DATA:
             case DATA_BYTES:
-                metadataKey = "meta.data";
+            case DATA_CANAL:
+            case DATA_BYTES_CANAL:
+                metadataKey = "meta.data_canal";
+                break;
+            case DATA_DEBEZIUM:
+            case DATA_BYTES_DEBEZIUM:
+                metadataKey = "meta.data_debezium";
                 break;
             case IS_DDL:
                 metadataKey = "meta.is_ddl";
@@ -317,9 +323,10 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
 
     @Override
     public Set<MetaField> supportedMetaFields() {
-        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATA,
+        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATA_CANAL,
             MetaField.DATABASE_NAME, MetaField.OP_TYPE, MetaField.OP_TS, MetaField.IS_DDL,
             MetaField.TS, MetaField.SQL_TYPE, MetaField.MYSQL_TYPE, MetaField.PK_NAMES,
-            MetaField.BATCH_ID, MetaField.UPDATE_BEFORE, MetaField.DATA_BYTES);
+            MetaField.BATCH_ID, MetaField.UPDATE_BEFORE, MetaField.DATA_BYTES_DEBEZIUM,
+            MetaField.DATA_DEBEZIUM, MetaField.DATA_BYTES_CANAL, MetaField.DATA, MetaField.DATA_BYTES);
     }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 13b741217..be326b478 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -227,7 +227,8 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S
                 metadataKey = "value.op-type";
                 break;
             case DATA:
-                metadataKey = "value.data";
+            case DATA_CANAL:
+                metadataKey = "value.data_canal";
                 break;
             case IS_DDL:
                 metadataKey = "value.is-ddl";
@@ -255,8 +256,9 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S
 
     @Override
     public Set<MetaField> supportedMetaFields() {
-        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.OP_TYPE, MetaField.DATABASE_NAME,
-                MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, MetaField.OP_TS, MetaField.IS_DDL,
-                MetaField.MYSQL_TYPE, MetaField.BATCH_ID, MetaField.UPDATE_BEFORE, MetaField.DATA);
+        return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.OP_TYPE,
+            MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS,
+            MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, MetaField.BATCH_ID,
+            MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java
index 9a1248642..19811532b 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java
@@ -49,7 +49,7 @@ public class MySqlExtractNodeTest extends SerializeBaseTest<MySqlExtractNode> {
         Map<MetaField, String> formatMap = new HashMap<>();
         formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
         formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'meta.table_name' VIRTUAL");
-        formatMap.put(MetaField.DATA, "STRING METADATA FROM 'meta.data' VIRTUAL");
+        formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
         formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'meta.database_name' VIRTUAL");
         formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'meta.op_type' VIRTUAL");
         formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL");
@@ -60,7 +60,12 @@ public class MySqlExtractNodeTest extends SerializeBaseTest<MySqlExtractNode> {
         formatMap.put(MetaField.PK_NAMES, "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL");
         formatMap.put(MetaField.BATCH_ID, "BIGINT METADATA FROM 'meta.batch_id' VIRTUAL");
         formatMap.put(MetaField.UPDATE_BEFORE, "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL");
-        formatMap.put(MetaField.DATA_BYTES, "BYTES METADATA FROM 'meta.data' VIRTUAL");
+        formatMap.put(MetaField.DATA_BYTES_DEBEZIUM, "BYTES METADATA FROM 'meta.data_debezium' VIRTUAL");
+        formatMap.put(MetaField.DATA_DEBEZIUM, "STRING METADATA FROM 'meta.data_debezium' VIRTUAL");
+        formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
+        formatMap.put(MetaField.DATA, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
+        formatMap.put(MetaField.DATA_BYTES, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
+
         MySqlExtractNode node = getTestObject();
         boolean formatEquals = true;
         for (MetaField metaField : node.supportedMetaFields()) {
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
index 52588d055..a9f495c28 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
@@ -50,7 +50,8 @@ public class KafkaLoadNodeTest extends SerializeBaseTest<KafkaLoadNode> {
     public void testMetaFields() {
         Map<MetaField, String> formatMap = new HashMap<>();
         formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
-        formatMap.put(MetaField.DATA, "STRING METADATA FROM 'value.data'");
+        formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'value.data_canal'");
+        formatMap.put(MetaField.DATA, "STRING METADATA FROM 'value.data_canal'");
         formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'value.table'");
         formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'value.database'");
         formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.op-type'");
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 5f79b4980..fcfb636ac 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -23,6 +23,7 @@ import io.debezium.data.Envelope;
 import io.debezium.data.Envelope.FieldName;
 import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
 import java.util.LinkedHashMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.api.DataTypes;
@@ -35,6 +36,8 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.inlong.sort.cdc.debezium.table.MetadataConverter;
 import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -97,8 +100,27 @@ public enum MySqlReadableMetadata {
                 }
             }),
 
+    DATA_DEFAULT(
+        "meta.data",
+        DataTypes.STRING(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return null;
+            }
+
+            @Override
+            public Object read(SourceRecord record,
+                @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+                // construct canal json
+                return getCanalData(record, (GenericRowData) rowData, tableSchema);
+            }
+        }),
+
     DATA(
-            "meta.data",
+            "meta.data_canal",
             DataTypes.STRING(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
@@ -112,37 +134,49 @@ public enum MySqlReadableMetadata {
                 public Object read(SourceRecord record,
                                    @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
                     // construct canal json
-                    Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
-                    // tableName
-                    String tableName = getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
-                    // databaseName
-                    String databaseName = getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
-                    // opTs
-                    long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
-                    // ts
-                    long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
-                    // actual data
-                    GenericRowData data = (GenericRowData) rowData;
-                    Map<String, Object> field = (Map<String, Object>) data.getField(0);
-                    List<Map<String, Object>> dataList = new ArrayList<>();
-                    dataList.add(field);
-
-                    CanalJson canalJson = CanalJson.builder()
-                            .data(dataList).database(databaseName)
-                            .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
-                            .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
-                            .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
-
-                    try {
-                        ObjectMapper objectMapper = new ObjectMapper();
-                        return StringData.fromString(objectMapper.writeValueAsString(canalJson));
-                    } catch (Exception e) {
-                        throw new IllegalStateException("exception occurs when get meta data", e);
-                    }
+                    return getCanalData(record, (GenericRowData) rowData, tableSchema);
                 }
             }),
 
+    DATA_DEBEZIUM(
+        "meta.data_debezium",
+        DataTypes.STRING(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return null;
+            }
+
+            @Override
+            public Object read(SourceRecord record,
+                @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+                // construct debezium json
+                Struct messageStruct = (Struct) record.value();
+                Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+                GenericRowData data = (GenericRowData) rowData;
+                Map<String, Object> field = (Map<String, Object>) data.getField(0);
+
+                Source source = Source.builder().db(getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY))
+                    .table(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY))
+                    .name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+                    .sqlType(getSqlType(tableSchema))
+                    .pkNames(getPkNames(tableSchema))
+                    .mysqlType(getMysqlType(tableSchema))
+                    .build();
+                DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source)
+                        .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getOpType(record))
+                    .tableChange(tableSchema).build();
+
+                try {
+                    return StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
+                } catch (Exception e) {
+                    throw new IllegalStateException("exception occurs when get meta data", e);
+                }
+            }
+        }),
+
     /**
      * Name of the table that contain the row. .
      */
@@ -379,9 +413,41 @@ public enum MySqlReadableMetadata {
                 }
             });
 
+    private static StringData getCanalData(SourceRecord record, GenericRowData rowData,
+        TableChange tableSchema) {
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        // tableName
+        String tableName = getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
+        // databaseName
+        String databaseName = getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
+        // opTs
+        long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+        // ts
+        long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
+        // actual data
+        GenericRowData data = rowData;
+        Map<String, Object> field = (Map<String, Object>) data.getField(0);
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        dataList.add(field);
+
+        CanalJson canalJson = CanalJson.builder()
+            .data(dataList).database(databaseName)
+            .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
+            .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
+            .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
+
+        try {
+            return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
+        } catch (Exception e) {
+            throw new IllegalStateException("exception occurs when get meta data", e);
+        }
+    }
+
     private final String key;
     private final DataType dataType;
     private final MetadataConverter converter;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     MySqlReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
         this.key = key;
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 2e4f45e6d..7b80c22ea 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -68,7 +68,7 @@ public class AllMigrateTest {
 
     private MySqlExtractNode buildAllMigrateExtractNodeWithBytesFormat() {
         List<FieldInfo> fields = Collections.singletonList(
-            new MetaFieldInfo("data", MetaField.DATA_BYTES));
+            new MetaFieldInfo("data", MetaField.DATA_BYTES_DEBEZIUM));
         Map<String, String> option = new HashMap<>();
         option.put("append-mode", "true");
         option.put("migrate-all", "true");
diff --git a/inlong-sort/sort-formats/format-json/pom.xml b/inlong-sort/sort-formats/format-json/pom.xml
index 2c3da00ab..c317bfebb 100644
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ b/inlong-sort/sort-formats/format-json/pom.xml
@@ -75,6 +75,10 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-core</artifactId>
+        </dependency>
 
     </dependencies>
 
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
new file mode 100644
index 000000000..51d60069d
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.debezium;
+
+import io.debezium.relational.history.TableChanges;
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class DebeziumJson {
+
+    private Map<String, String> before;
+    private Map<String, Object> after;
+    private Source source;
+    private TableChanges.TableChange tableChange;
+    private long tsMs;
+    private String op;
+
+    @Builder
+    @Data
+    public static class Source {
+        private String name;
+        private String db;
+        private String table;
+        private List<String> pkNames;
+        private Map<String, Integer> sqlType;
+        private Map<String, String> mysqlType;
+    }
+
+}