You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/26 09:57:23 UTC

[doris-flink-connector] branch master updated: support flink catalog create table (#147)

This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1adebb3  support flink catalog create table (#147)
1adebb3 is described below

commit 1adebb3128ebbb90629aeb2e849d3d74cc94d573
Author: wudi <67...@qq.com>
AuthorDate: Mon Jun 26 17:57:18 2023 +0800

    support flink catalog create table (#147)
    
    Co-authored-by: wudi <>
---
 .../apache/doris/flink/catalog/DorisCatalog.java   | 195 +++++++++---------
 .../doris/flink/catalog/DorisCatalogFactory.java   |  18 +-
 .../doris/flink/catalog/DorisCatalogOptions.java   |  18 +-
 .../doris/flink/catalog/DorisTypeMapper.java       | 218 ++++++++++++++++-----
 .../doris/flink/catalog/doris/DorisSystem.java     |  51 ++---
 .../DorisSystemException.java}                     |  33 +++-
 .../apache/doris/flink/catalog/CatalogTest.java    |  98 ++++++++-
 7 files changed, 445 insertions(+), 186 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index ef735e5..e495146 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -17,10 +17,14 @@
 package org.apache.doris.flink.catalog;
 
 import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
 import org.apache.doris.flink.table.DorisDynamicTableFactory;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -57,21 +61,17 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 import java.util.StringJoiner;
-import java.util.function.Predicate;
 
 import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
-import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.catalog.DorisCatalogOptions.getCreateTableProps;
 import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
 import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
 import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
@@ -80,6 +80,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
 import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * catalog for flink
@@ -87,50 +88,23 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 public class DorisCatalog extends AbstractCatalog {
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class);
-
-    private static final Set<String> builtinDatabases =
-            new HashSet<String>() {
-                {
-                    add("information_schema");
-                }
-            };
-
-    private final String username;
-    private final String password;
-    private final String jdbcUrl;
+    private DorisSystem dorisSystem;
+    private DorisConnectionOptions connectionOptions;
     private final Map<String, String> properties;
 
     public DorisCatalog(
             String catalogName,
-            String jdbcUrl,
+            DorisConnectionOptions connectionOptions,
             String defaultDatabase,
-            String username,
-            String password,
             Map<String, String> properties) {
         super(catalogName, defaultDatabase);
-
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(jdbcUrl), "jdbc-url cannot be null or empty");
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(username), "username cannot be null or empty");
-
-        this.jdbcUrl = jdbcUrl.endsWith("/") ? jdbcUrl : jdbcUrl + "/";
-        ;
-        this.username = username;
-        this.password = password;
+        this.connectionOptions = connectionOptions;
         this.properties = Collections.unmodifiableMap(properties);
     }
 
     @Override
     public void open() throws CatalogException {
-        // test connection, fail early if we cannot connect to database
-        try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
-        } catch (SQLException e) {
-            throw new ValidationException(
-                    String.format("Failed connecting to %s via JDBC.", jdbcUrl), e);
-        }
-
-        LOG.info("Catalog {} established connection to {}", getName(), jdbcUrl);
+        dorisSystem = new DorisSystem(connectionOptions);
     }
 
     @Override
@@ -151,17 +125,13 @@ public class DorisCatalog extends AbstractCatalog {
 
     @Override
     public List<String> listDatabases() throws CatalogException {
-        return extractColumnValuesBySQL(
-                jdbcUrl,
-                "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
-                1,
-                dbName -> !builtinDatabases.contains(dbName));
+        return dorisSystem.listDatabases();
     }
 
     @Override
     public CatalogDatabase getDatabase(String databaseName)
             throws DatabaseNotExistException, CatalogException {
-        if (listDatabases().contains(databaseName)) {
+        if (databaseExists(databaseName)) {
             return new CatalogDatabaseImpl(Collections.emptyMap(), null);
         } else {
             throw new DatabaseNotExistException(getName(), databaseName);
@@ -171,20 +141,36 @@ public class DorisCatalog extends AbstractCatalog {
     @Override
     public boolean databaseExists(String databaseName) throws CatalogException {
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-
         return listDatabases().contains(databaseName);
     }
 
     @Override
     public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
-        throw new UnsupportedOperationException();
+        if(databaseExists(name)){
+            if(ignoreIfExists){
+                return;
+            }
+            throw new DatabaseAlreadyExistException(getName(), name);
+        }else {
+            dorisSystem.createDatabase(name);
+        }
     }
 
     @Override
     public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
-            throws DatabaseNotEmptyException, CatalogException {
-        throw new UnsupportedOperationException();
+            throws DatabaseNotEmptyException, CatalogException, DatabaseNotExistException {
+        if(!databaseExists(name)){
+            if (ignoreIfNotExists) {
+                return;
+            }
+            throw new DatabaseNotExistException(getName(), name);
+        }
+
+        if (!cascade && listTables(name).size() > 0) {
+            throw new DatabaseNotEmptyException(getName(),name);
+        }
+        dorisSystem.dropDatabase(name);
     }
 
     @Override
@@ -204,8 +190,7 @@ public class DorisCatalog extends AbstractCatalog {
             throw new DatabaseNotExistException(getName(), databaseName);
         }
 
-        return extractColumnValuesBySQL(
-                jdbcUrl + databaseName,
+        return dorisSystem.extractColumnValuesBySQL(
                 "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
                 1,
                 null,
@@ -231,14 +216,13 @@ public class DorisCatalog extends AbstractCatalog {
         if (!props.containsKey(FENODES.key())) {
             props.put(FENODES.key(), queryFenodes());
         }
-        props.put(USERNAME.key(), username);
-        props.put(PASSWORD.key(), password);
+        props.put(USERNAME.key(), connectionOptions.getUsername());
+        props.put(PASSWORD.key(), connectionOptions.getPassword());
         props.put(TABLE_IDENTIFIER.key(), databaseName + "." + tableName);
 
         String labelPrefix = props.getOrDefault(SINK_LABEL_PREFIX.key(),"");
         props.put(SINK_LABEL_PREFIX.key(), String.join("_",labelPrefix,databaseName,tableName));
         //remove catalog option
-        props.remove(JDBCURL.key());
         props.remove(DEFAULT_DATABASE.key());
         return CatalogTable.of(createTableSchema(databaseName, tableName), null, Lists.newArrayList(), props);
 
@@ -246,7 +230,9 @@ public class DorisCatalog extends AbstractCatalog {
 
     @VisibleForTesting
     protected String queryFenodes() {
-        try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
+        try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
+                connectionOptions.getUsername(),
+                connectionOptions.getPassword())) {
             StringJoiner fenodes = new StringJoiner(",");
             PreparedStatement ps = conn.prepareStatement("SHOW FRONTENDS");
             ResultSet resultSet = ps.executeQuery();
@@ -262,8 +248,9 @@ public class DorisCatalog extends AbstractCatalog {
     }
 
     private Schema createTableSchema(String databaseName, String tableName) {
-        String dbUrl = jdbcUrl + databaseName;
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, password)) {
+        try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
+                connectionOptions.getUsername(),
+                connectionOptions.getPassword())) {
             PreparedStatement ps =
                     conn.prepareStatement(
                             String.format("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'", databaseName, tableName));
@@ -276,6 +263,7 @@ public class DorisCatalog extends AbstractCatalog {
                 String columnType = resultSet.getString("DATA_TYPE");
                 long columnSize = resultSet.getLong("COLUMN_SIZE");
                 long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
+
                 DataType flinkType = DorisTypeMapper.toFlinkType(columnName, columnType, (int) columnSize, (int) columnDigit);
                 columnNames.add(columnName);
                 columnTypes.add(flinkType);
@@ -302,7 +290,14 @@ public class DorisCatalog extends AbstractCatalog {
     @Override
     public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
-        throw new UnsupportedOperationException();
+        if(!tableExists(tablePath)){
+            if(ignoreIfNotExists){
+                return;
+            }
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        dorisSystem.dropTable(String.format("%s.%s", tablePath.getDatabaseName(), tablePath.getObjectName()));
     }
 
     @Override
@@ -314,7 +309,56 @@ public class DorisCatalog extends AbstractCatalog {
     @Override
     public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
-        throw new UnsupportedOperationException();
+        checkNotNull(tablePath, "tablePath cannot be null");
+        checkNotNull(table, "table cannot be null");
+
+        if(!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+        }
+        if(tableExists(tablePath)){
+            if(ignoreIfExists){
+                return;
+            }
+            throw new TableAlreadyExistException(getName(), tablePath);
+        }
+
+        Map<String, String> options = table.getOptions();
+        if (!IDENTIFIER.equals(options.get(CONNECTOR.key()))) {
+            return;
+        }
+
+        List<String> primaryKeys = getCreateDorisKeys(table.getSchema());
+        TableSchema schema = new TableSchema();
+        schema.setDatabase(tablePath.getDatabaseName());
+        schema.setTable(tablePath.getObjectName());
+        schema.setTableComment(table.getComment());
+        schema.setFields(getCreateDorisColumns(table.getSchema()));
+        schema.setKeys(primaryKeys);
+        schema.setModel(DataModel.UNIQUE);
+        schema.setDistributeKeys(primaryKeys);
+        schema.setProperties(getCreateTableProps(options));
+
+        dorisSystem.createTable(schema);
+    }
+
+    public List<String> getCreateDorisKeys(org.apache.flink.table.api.TableSchema schema){
+        Preconditions.checkState(schema.getPrimaryKey().isPresent(),"primary key cannot be null");
+        return schema.getPrimaryKey().get().getColumns();
+    }
+
+    public Map<String, FieldSchema> getCreateDorisColumns(org.apache.flink.table.api.TableSchema schema){
+        String[] fieldNames = schema.getFieldNames();
+        DataType[] fieldTypes = schema.getFieldDataTypes();
+
+        Map<String, FieldSchema> fields = new LinkedHashMap<>();
+        for (int i = 0; i < fieldNames.length; i++) {
+            fields.put(fieldNames[i],
+                    new FieldSchema(
+                            fieldNames[i],
+                            DorisTypeMapper.toDorisType(fieldTypes[i]),
+                            null));
+        }
+        return fields;
     }
 
     @Override
@@ -490,37 +534,4 @@ public class DorisCatalog extends AbstractCatalog {
             throws PartitionNotExistException, CatalogException {
         throw new UnsupportedOperationException();
     }
-
-
-    private List<String> extractColumnValuesBySQL(
-            String connUrl,
-            String sql,
-            int columnIndex,
-            Predicate<String> filterFunc,
-            Object... params) {
-
-        List<String> columnValues = Lists.newArrayList();
-
-        try (Connection conn = DriverManager.getConnection(connUrl, username, password);
-             PreparedStatement ps = conn.prepareStatement(sql)) {
-            if (Objects.nonNull(params) && params.length > 0) {
-                for (int i = 0; i < params.length; i++) {
-                    ps.setObject(i + 1, params[i]);
-                }
-            }
-            ResultSet rs = ps.executeQuery();
-            while (rs.next()) {
-                String columnValue = rs.getString(columnIndex);
-                if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
-                    columnValues.add(columnValue);
-                }
-            }
-            return columnValues;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format(
-                            "The following SQL query could not be executed (%s): %s", connUrl, sql),
-                    e);
-        }
-    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index 9a80370..958ce3d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.doris.flink.catalog;
 
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.Catalog;
@@ -26,7 +27,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
-import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
@@ -67,7 +68,7 @@ public class DorisCatalogFactory implements CatalogFactory {
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(JDBCURL);
+        options.add(JDBC_URL);
         options.add(USERNAME);
         options.add(PASSWORD);
         return options;
@@ -76,7 +77,7 @@ public class DorisCatalogFactory implements CatalogFactory {
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(JDBCURL);
+        options.add(JDBC_URL);
         options.add(DEFAULT_DATABASE);
 
         options.add(FENODES);
@@ -115,12 +116,17 @@ public class DorisCatalogFactory implements CatalogFactory {
                 FactoryUtil.createCatalogFactoryHelper(this, context);
         helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
 
+        DorisConnectionOptions connectionOptions =
+                new DorisConnectionOptions.DorisConnectionOptionsBuilder()
+                        .withFenodes(helper.getOptions().get(FENODES))
+                        .withJdbcUrl(helper.getOptions().get(JDBC_URL))
+                        .withUsername(helper.getOptions().get(USERNAME))
+                        .withPassword(helper.getOptions().get(PASSWORD))
+                        .build();
         return new DorisCatalog(
                 context.getName(),
-                helper.getOptions().get(JDBCURL),
+                connectionOptions,
                 helper.getOptions().get(DEFAULT_DATABASE),
-                helper.getOptions().get(USERNAME),
-                helper.getOptions().get(PASSWORD),
                 ((Configuration) helper.getOptions()).toMap());
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
index ff87f9b..cc5772e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
@@ -20,7 +20,23 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class DorisCatalogOptions {
-    public static final ConfigOption<String> JDBCURL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url.");
     public static final ConfigOption<String> DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY).stringType().noDefaultValue();
+
+    public static final String TABLE_PROPERTIES_PREFIX = "table.properties.";
+
+    public static Map<String, String> getCreateTableProps(Map<String, String> tableOptions) {
+        final Map<String, String> tableProps = new HashMap<>();
+
+        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+            if (entry.getKey().startsWith(TABLE_PROPERTIES_PREFIX)) {
+                String subKey = entry.getKey().substring(TABLE_PROPERTIES_PREFIX.length());
+                tableProps.put(subKey, entry.getValue());
+            }
+        }
+        return tableProps;
+    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index 372b9b2..09bb492 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -16,83 +16,87 @@
 // under the License.
 package org.apache.doris.flink.catalog;
 
+import org.apache.doris.flink.catalog.doris.DorisType;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
-public class DorisTypeMapper {
-
-    // -------------------------number----------------------------
-    private static final String DORIS_TINYINT = "TINYINT";
-    private static final String DORIS_SMALLINT = "SMALLINT";
-    private static final String DORIS_INT = "INT";
-    private static final String DORIS_BIGINT = "BIGINT";
-    private static final String DORIS_LARGEINT = "BIGINT UNSIGNED";
-    private static final String DORIS_DECIMAL = "DECIMAL";
-    private static final String DORIS_DECIMALV2 = "DECIMALV2";
-    private static final String DORIS_DECIMAL32 = "DECIMAL32";
-    private static final String DORIS_DECIMAL64 = "DECIMAL64";
-    private static final String DORIS_DECIMAL128I = "DECIMAL128I";
-    private static final String DORIS_FLOAT = "FLOAT";
-    private static final String DORIS_DOUBLE = "DOUBLE";
-
-    // -------------------------string----------------------------
-    private static final String DORIS_CHAR = "CHAR";
-    private static final String DORIS_VARCHAR = "VARCHAR";
-    private static final String DORIS_STRING = "STRING";
-    private static final String DORIS_TEXT = "TEXT";
-    private static final String DORIS_JSONB = "JSONB";
-
-    // ------------------------------time-------------------------
-    private static final String DORIS_DATE = "DATE";
-    private static final String DORIS_DATEV2 = "DATEV2";
-    private static final String DORIS_DATETIME = "DATETIME";
-    private static final String DORIS_DATETIMEV2 = "DATETIMEV2";
-
-    //------------------------------bool------------------------
-    private static final String DORIS_BOOLEAN = "BOOLEAN";
+import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN;
+import static org.apache.doris.flink.catalog.doris.DorisType.CHAR;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATE;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATETIME;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATETIME_V2;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATE_V2;
+import static org.apache.doris.flink.catalog.doris.DorisType.DECIMAL;
+import static org.apache.doris.flink.catalog.doris.DorisType.DECIMAL_V3;
+import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
+import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
+import static org.apache.doris.flink.catalog.doris.DorisType.INT;
+import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
+import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.SMALLINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.STRING;
+import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
 
+public class DorisTypeMapper {
 
     public static DataType toFlinkType(String columnName, String columnType, int precision, int scale) {
         columnType = columnType.toUpperCase();
         switch (columnType) {
-            case DORIS_BOOLEAN:
+            case BOOLEAN:
                 return DataTypes.BOOLEAN();
-            case DORIS_TINYINT:
+            case TINYINT:
                 if (precision == 0) {
                     //The boolean type will become tinyint when queried in information_schema, and precision=0
                     return DataTypes.BOOLEAN();
                 } else {
                     return DataTypes.TINYINT();
                 }
-            case DORIS_SMALLINT:
+            case SMALLINT:
                 return DataTypes.SMALLINT();
-            case DORIS_INT:
+            case INT:
                 return DataTypes.INT();
-            case DORIS_BIGINT:
+            case BIGINT:
                 return DataTypes.BIGINT();
-            case DORIS_DECIMAL:
-            case DORIS_DECIMALV2:
-            case DORIS_DECIMAL32:
-            case DORIS_DECIMAL64:
-            case DORIS_DECIMAL128I:
+            case DECIMAL:
+            case DECIMAL_V3:
                 return DataTypes.DECIMAL(precision, scale);
-            case DORIS_FLOAT:
+            case FLOAT:
                 return DataTypes.FLOAT();
-            case DORIS_DOUBLE:
+            case DOUBLE:
                 return DataTypes.DOUBLE();
-            case DORIS_CHAR:
+            case CHAR:
                 return DataTypes.CHAR(precision);
-            case DORIS_LARGEINT:
-            case DORIS_VARCHAR:
-            case DORIS_STRING:
-            case DORIS_TEXT:
-            case DORIS_JSONB:
+            case VARCHAR:
+                return DataTypes.VARCHAR(precision);
+            case LARGEINT:
+            case STRING:
+            case JSONB:
                 return DataTypes.STRING();
-            case DORIS_DATE:
-            case DORIS_DATEV2:
+            case DATE:
+            case DATE_V2:
                 return DataTypes.DATE();
-            case DORIS_DATETIME:
-            case DORIS_DATETIMEV2:
+            case DATETIME:
+            case DATETIME_V2:
                 return DataTypes.TIMESTAMP(0);
             default:
                 throw new UnsupportedOperationException(
@@ -100,4 +104,112 @@ public class DorisTypeMapper {
                                 "Doesn't support Doris type '%s' on column '%s'", columnType, columnName));
         }
     }
+
+    public static String toDorisType(DataType flinkType){
+        LogicalType logicalType = flinkType.getLogicalType();
+        return logicalType.accept(new LogicalTypeVisitor(logicalType));
+    }
+
+    private static class LogicalTypeVisitor extends LogicalTypeDefaultVisitor<String> {
+        private final LogicalType type;
+
+        LogicalTypeVisitor(LogicalType type) {
+            this.type = type;
+        }
+
+        @Override
+        public String visit(CharType charType) {
+            return String.format("%s(%s)", DorisType.CHAR, charType.getLength());
+        }
+
+        @Override
+        public String visit(VarCharType varCharType) {
+            int length = varCharType.getLength();
+            return length > 65533 ? STRING : String.format("%s(%s)", VARCHAR, length);
+        }
+
+        @Override
+        public String visit(BooleanType booleanType) {
+            return BOOLEAN;
+        }
+
+        @Override
+        public String visit(VarBinaryType varBinaryType) {
+            return STRING;
+        }
+
+        @Override
+        public String  visit(DecimalType decimalType) {
+            int precision = decimalType.getPrecision();
+            int scale = decimalType.getScale();
+            return precision <= 38
+                    ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale >= 0 ? scale : 0)
+                    : DorisType.STRING;
+        }
+
+        @Override
+        public String visit(TinyIntType tinyIntType) {
+            return TINYINT;
+        }
+
+        @Override
+        public String visit(SmallIntType smallIntType) {
+            return SMALLINT;
+        }
+
+        @Override
+        public String visit(IntType intType) {
+            return INT;
+        }
+
+        @Override
+        public String visit(BigIntType bigIntType) {
+            return BIGINT;
+        }
+
+        @Override
+        public String visit(FloatType floatType) {
+            return FLOAT;
+        }
+
+        @Override
+        public String visit(DoubleType doubleType) {
+            return DOUBLE;
+        }
+
+        @Override
+        public String visit(DateType dateType) {
+            return DATE_V2;
+        }
+
+        @Override
+        public String visit(TimestampType timestampType) {
+            int precision = timestampType.getPrecision();
+            return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(Math.max(precision, 0), 6));
+        }
+
+        @Override
+        public String visit(ArrayType arrayType) {
+            return STRING;
+        }
+
+        @Override
+        public String visit(MapType mapType) {
+            return STRING;
+        }
+
+        @Override
+        public String visit(RowType rowType) {
+            return STRING;
+        }
+
+        @Override
+        protected String defaultMethod(LogicalType logicalType) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Flink doesn't support converting type %s to Doris type yet.",
+                            type.toString()));
+        }
+    }
+
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 36aba1c..0a0f5e9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -23,14 +23,13 @@ import org.apache.doris.flink.connection.JdbcConnectionProvider;
 import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
 import org.apache.doris.flink.exception.CreateTableException;
 import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.DorisSystemException;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Statement;
@@ -56,33 +55,34 @@ public class DorisSystem {
         this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
     }
 
-    public List<String> listDatabases() throws Exception {
+    public List<String> listDatabases() {
         return extractColumnValuesBySQL(
                 "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
                 1,
                 dbName -> !builtinDatabases.contains(dbName));
     }
 
-    public boolean databaseExists(String database) throws Exception {
+    public boolean databaseExists(String database) {
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
         return listDatabases().contains(database);
     }
 
-    public boolean createDatabase(String database) throws Exception {
-        execute(String.format("CREATE DATABASE %s", database));
+    public boolean createDatabase(String database) {
+        execute(String.format("CREATE DATABASE IF NOT EXISTS %s", database));
+        return true;
+    }
+
+    public boolean dropDatabase(String database) {
+        execute(String.format("DROP DATABASE IF EXISTS %s", database));
         return true;
     }
 
     public boolean tableExists(String database, String table){
-        try {
-            return databaseExists(database)
-                    && listTables(database).contains(table);
-        } catch (Exception e) {
-            return false;
-        }
+        return databaseExists(database)
+                && listTables(database).contains(table);
     }
 
-    public List<String> listTables(String databaseName) throws Exception {
+    public List<String> listTables(String databaseName) {
         if (!databaseExists(databaseName)) {
             throw new DorisRuntimeException("database" + databaseName + " is not exists");
         }
@@ -93,29 +93,32 @@ public class DorisSystem {
                 databaseName);
     }
 
-    public void createTable(TableSchema schema) throws Exception {
+    public void dropTable(String tableName) {
+        execute(String.format("DROP TABLE IF EXISTS %s", tableName));
+    }
+
+    public void createTable(TableSchema schema) {
         String ddl = buildCreateTableDDL(schema);
         LOG.info("Create table with ddl:{}", ddl);
         execute(ddl);
     }
 
-    public void execute(String sql) throws Exception {
-        Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
-        try (Statement statement = conn.createStatement()) {
+    public void execute(String sql) {
+        try (Statement statement = jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
             statement.execute(sql);
+        } catch (Exception e){
+            throw new DorisSystemException(String.format("SQL query could not be executed: %s", sql), e);
         }
     }
 
-    private List<String> extractColumnValuesBySQL(
+    public List<String> extractColumnValuesBySQL(
             String sql,
             int columnIndex,
             Predicate<String> filterFunc,
-            Object... params) throws Exception {
+            Object... params) {
 
-        Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
         List<String> columnValues = Lists.newArrayList();
-
-        try (PreparedStatement ps = conn.prepareStatement(sql)) {
+        try (PreparedStatement ps = jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
             if (Objects.nonNull(params) && params.length > 0) {
                 for (int i = 0; i < params.length; i++) {
                     ps.setObject(i + 1, params[i]);
@@ -130,7 +133,7 @@ public class DorisSystem {
             }
             return columnValues;
         } catch (Exception e) {
-            throw new CatalogException(
+            throw new DorisSystemException(
                     String.format(
                             "The following SQL query could not be executed: %s", sql),
                     e);
@@ -210,7 +213,7 @@ public class DorisSystem {
                 .append(" ")
                 .append(field.getTypeString())
                 .append(" COMMENT '")
-                .append(field.getComment())
+                .append(field.getComment() == null ? "" : field.getComment())
                 .append("',");
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSystemException.java
similarity index 52%
copy from flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
copy to flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSystemException.java
index ff87f9b..eec2922 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSystemException.java
@@ -14,13 +14,32 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.doris.flink.catalog;
 
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
+package org.apache.doris.flink.exception;
 
-public class DorisCatalogOptions {
-    public static final ConfigOption<String> JDBCURL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url.");
-    public static final ConfigOption<String> DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY).stringType().noDefaultValue();
+/**
+ * Doris System run exception.
+ */
+public class DorisSystemException extends RuntimeException {
+    public DorisSystemException() {
+        super();
+    }
+
+    public DorisSystemException(String message) {
+        super(message);
+    }
+
+    public DorisSystemException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DorisSystemException(Throwable cause) {
+        super(cause);
+    }
+
+    protected DorisSystemException(String message, Throwable cause,
+                                   boolean enableSuppression,
+                                   boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
index e1452d3..6edfd50 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
@@ -17,15 +17,24 @@
 
 package org.apache.doris.flink.catalog;
 
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
@@ -52,10 +61,10 @@ import static org.junit.Assert.assertTrue;
 public class CatalogTest {
     private static final String TEST_CATALOG_NAME = "doris_catalog";
     private static final String TEST_FENODES = "127.0.0.1:8030";
-    private static final String TEST_JDBCURL = "jdbc:mysql://127.0.0.1.78:9030";
+    private static final String TEST_JDBCURL = "jdbc:mysql://127.0.0.1:9030";
     private static final String TEST_USERNAME = "root";
     private static final String TEST_PWD = "";
-    private static final String TEST_DB = "test";
+    private static final String TEST_DB = "db1";
     private static final String TEST_TABLE = "t_all_types";
     private static final String TEST_TABLE_SINK = "t_all_types_sink";
     private static final String TEST_TABLE_SINK_GROUPBY = "t_all_types_sink_groupby";
@@ -78,6 +87,25 @@ public class CatalogTest {
                     .column("c_tinyint", DataTypes.TINYINT())
                     .build();
 
+    protected static final TableSchema TABLE_SCHEMA_1 =
+            TableSchema.builder()
+                    .field("id", new AtomicDataType(new VarCharType(false,128)))
+                    .field("c_boolean", DataTypes.BOOLEAN())
+                    .field("c_char", DataTypes.CHAR(1))
+                    .field("c_date", DataTypes.DATE())
+                    .field("c_datetime", DataTypes.TIMESTAMP(0))
+                    .field("c_decimal", DataTypes.DECIMAL(10, 2))
+                    .field("c_double", DataTypes.DOUBLE())
+                    .field("c_float", DataTypes.FLOAT())
+                    .field("c_int", DataTypes.INT())
+                    .field("c_bigint", DataTypes.BIGINT())
+                    .field("c_largeint", DataTypes.STRING())
+                    .field("c_smallint", DataTypes.SMALLINT())
+                    .field("c_string", DataTypes.STRING())
+                    .field("c_tinyint", DataTypes.TINYINT())
+                    .primaryKey("id")
+                    .build();
+
     private static final List<Row> ALL_TYPES_ROWS =
             Lists.newArrayList(
                     Row.ofKind(
@@ -118,9 +146,17 @@ public class CatalogTest {
 
     @Before
     public void setup() {
+        DorisConnectionOptions connectionOptions =
+                new DorisConnectionOptions.DorisConnectionOptionsBuilder()
+                        .withFenodes(TEST_FENODES)
+                        .withJdbcUrl(TEST_JDBCURL)
+                        .withUsername(TEST_USERNAME)
+                        .withPassword(TEST_PWD)
+                        .build();
+
         Map<String,String> props = new HashMap<>();
         props.put("sink.enable-2pc","false");
-        catalog = new DorisCatalog(TEST_CATALOG_NAME, TEST_JDBCURL, TEST_DB, TEST_USERNAME, TEST_PWD, props);
+        catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, TEST_DB, props);
         this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
         tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
         // Use doris catalog.
@@ -147,6 +183,18 @@ public class CatalogTest {
         assertTrue(catalog.databaseExists(TEST_DB));
     }
 
+    @Test
+    public void testCreateDb() throws Exception {
+        catalog.createDatabase("db1",createDb(), true);
+        assertTrue(catalog.databaseExists("db1"));
+    }
+
+    @Test
+    public void testDropDb() throws Exception {
+        catalog.dropDatabase("db1",false);
+        assertFalse(catalog.databaseExists("db1"));
+    }
+
     @Test
     public void testListTables() throws DatabaseNotExistException {
         List<String> actual = catalog.listTables(TEST_DB);
@@ -165,12 +213,34 @@ public class CatalogTest {
     }
 
     @Test
+    @Ignore
     public void testGetTable() throws TableNotExistException {
+        //todo: string varchar mapping
         CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE));
         System.out.println(table);
         assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
     }
 
+    @Test
+    @Ignore
+    public void testCreateTable() throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException {
+        //todo: Record primary key not null information
+        ObjectPath tablePath = new ObjectPath(TEST_DB, TEST_TABLE);
+        catalog.dropTable(tablePath, true);
+        catalog.createTable(tablePath, createTable(), true);
+        CatalogBaseTable tableGet = catalog.getTable(tablePath);
+        System.out.println(tableGet.getUnresolvedSchema());
+        System.out.println(TABLE_SCHEMA_1);
+        assertEquals(TABLE_SCHEMA_1, tableGet.getUnresolvedSchema());
+    }
+
+    @Test
+    public void testDropTable() throws TableNotExistException {
+        catalog.dropTable(new ObjectPath("db1", "tbl1"), true);
+        assertFalse(catalog.tableExists(new ObjectPath("db1", "tbl1")));
+    }
+
+
     // ------ test select query. ------
 
     @Test
@@ -260,4 +330,26 @@ public class CatalogTest {
                                 .collect());
         assertEquals(Lists.newArrayList(Row.ofKind(RowKind.INSERT, "catalog","100002")), results);
     }
+
+    private static CatalogDatabase createDb() {
+        return new CatalogDatabaseImpl(
+                new HashMap<String, String>() {
+                    {
+                        put("k1", "v1");
+                    }
+                },
+                "");
+    }
+
+    private static CatalogTable createTable() {
+        return new CatalogTableImpl(
+                TABLE_SCHEMA_1,
+                new HashMap<String, String>() {
+                    {
+                        put("connector", "doris");
+                        put("table.properties.replication_num", "1");
+                    }
+                },
+                "FlinkTable");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org