You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/26 09:57:23 UTC
[doris-flink-connector] branch master updated: support flink catalog create table (#147)
This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1adebb3 support flink catalog create table (#147)
1adebb3 is described below
commit 1adebb3128ebbb90629aeb2e849d3d74cc94d573
Author: wudi <67...@qq.com>
AuthorDate: Mon Jun 26 17:57:18 2023 +0800
support flink catalog create table (#147)
Co-authored-by: wudi <>
---
.../apache/doris/flink/catalog/DorisCatalog.java | 195 +++++++++---------
.../doris/flink/catalog/DorisCatalogFactory.java | 18 +-
.../doris/flink/catalog/DorisCatalogOptions.java | 18 +-
.../doris/flink/catalog/DorisTypeMapper.java | 218 ++++++++++++++++-----
.../doris/flink/catalog/doris/DorisSystem.java | 51 ++---
.../DorisSystemException.java} | 33 +++-
.../apache/doris/flink/catalog/CatalogTest.java | 98 ++++++++-
7 files changed, 445 insertions(+), 186 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index ef735e5..e495146 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -17,10 +17,14 @@
package org.apache.doris.flink.catalog;
import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.table.DorisDynamicTableFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -57,21 +61,17 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.StringJoiner;
-import java.util.function.Predicate;
import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
-import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.catalog.DorisCatalogOptions.getCreateTableProps;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
@@ -80,6 +80,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* catalog for flink
@@ -87,50 +88,23 @@ import static org.apache.flink.util.Preconditions.checkArgument;
public class DorisCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class);
-
- private static final Set<String> builtinDatabases =
- new HashSet<String>() {
- {
- add("information_schema");
- }
- };
-
- private final String username;
- private final String password;
- private final String jdbcUrl;
+ private DorisSystem dorisSystem;
+ private DorisConnectionOptions connectionOptions;
private final Map<String, String> properties;
public DorisCatalog(
String catalogName,
- String jdbcUrl,
+ DorisConnectionOptions connectionOptions,
String defaultDatabase,
- String username,
- String password,
Map<String, String> properties) {
super(catalogName, defaultDatabase);
-
- checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(jdbcUrl), "jdbc-url cannot be null or empty");
- checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(username), "username cannot be null or empty");
-
- this.jdbcUrl = jdbcUrl.endsWith("/") ? jdbcUrl : jdbcUrl + "/";
- ;
- this.username = username;
- this.password = password;
+ this.connectionOptions = connectionOptions;
this.properties = Collections.unmodifiableMap(properties);
}
@Override
public void open() throws CatalogException {
- // test connection, fail early if we cannot connect to database
- try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
- } catch (SQLException e) {
- throw new ValidationException(
- String.format("Failed connecting to %s via JDBC.", jdbcUrl), e);
- }
-
- LOG.info("Catalog {} established connection to {}", getName(), jdbcUrl);
+ dorisSystem = new DorisSystem(connectionOptions);
}
@Override
@@ -151,17 +125,13 @@ public class DorisCatalog extends AbstractCatalog {
@Override
public List<String> listDatabases() throws CatalogException {
- return extractColumnValuesBySQL(
- jdbcUrl,
- "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
- 1,
- dbName -> !builtinDatabases.contains(dbName));
+ return dorisSystem.listDatabases();
}
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
- if (listDatabases().contains(databaseName)) {
+ if (databaseExists(databaseName)) {
return new CatalogDatabaseImpl(Collections.emptyMap(), null);
} else {
throw new DatabaseNotExistException(getName(), databaseName);
@@ -171,20 +141,36 @@ public class DorisCatalog extends AbstractCatalog {
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-
return listDatabases().contains(databaseName);
}
@Override
public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
- throw new UnsupportedOperationException();
+ if(databaseExists(name)){
+ if(ignoreIfExists){
+ return;
+ }
+ throw new DatabaseAlreadyExistException(getName(), name);
+ }else {
+ dorisSystem.createDatabase(name);
+ }
}
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
- throws DatabaseNotEmptyException, CatalogException {
- throw new UnsupportedOperationException();
+ throws DatabaseNotEmptyException, CatalogException, DatabaseNotExistException {
+ if(!databaseExists(name)){
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new DatabaseNotExistException(getName(), name);
+ }
+
+ if (!cascade && listTables(name).size() > 0) {
+ throw new DatabaseNotEmptyException(getName(),name);
+ }
+ dorisSystem.dropDatabase(name);
}
@Override
@@ -204,8 +190,7 @@ public class DorisCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(), databaseName);
}
- return extractColumnValuesBySQL(
- jdbcUrl + databaseName,
+ return dorisSystem.extractColumnValuesBySQL(
"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
1,
null,
@@ -231,14 +216,13 @@ public class DorisCatalog extends AbstractCatalog {
if (!props.containsKey(FENODES.key())) {
props.put(FENODES.key(), queryFenodes());
}
- props.put(USERNAME.key(), username);
- props.put(PASSWORD.key(), password);
+ props.put(USERNAME.key(), connectionOptions.getUsername());
+ props.put(PASSWORD.key(), connectionOptions.getPassword());
props.put(TABLE_IDENTIFIER.key(), databaseName + "." + tableName);
String labelPrefix = props.getOrDefault(SINK_LABEL_PREFIX.key(),"");
props.put(SINK_LABEL_PREFIX.key(), String.join("_",labelPrefix,databaseName,tableName));
//remove catalog option
- props.remove(JDBCURL.key());
props.remove(DEFAULT_DATABASE.key());
return CatalogTable.of(createTableSchema(databaseName, tableName), null, Lists.newArrayList(), props);
@@ -246,7 +230,9 @@ public class DorisCatalog extends AbstractCatalog {
@VisibleForTesting
protected String queryFenodes() {
- try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
+ try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
+ connectionOptions.getUsername(),
+ connectionOptions.getPassword())) {
StringJoiner fenodes = new StringJoiner(",");
PreparedStatement ps = conn.prepareStatement("SHOW FRONTENDS");
ResultSet resultSet = ps.executeQuery();
@@ -262,8 +248,9 @@ public class DorisCatalog extends AbstractCatalog {
}
private Schema createTableSchema(String databaseName, String tableName) {
- String dbUrl = jdbcUrl + databaseName;
- try (Connection conn = DriverManager.getConnection(dbUrl, username, password)) {
+ try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
+ connectionOptions.getUsername(),
+ connectionOptions.getPassword())) {
PreparedStatement ps =
conn.prepareStatement(
String.format("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'", databaseName, tableName));
@@ -276,6 +263,7 @@ public class DorisCatalog extends AbstractCatalog {
String columnType = resultSet.getString("DATA_TYPE");
long columnSize = resultSet.getLong("COLUMN_SIZE");
long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
+
DataType flinkType = DorisTypeMapper.toFlinkType(columnName, columnType, (int) columnSize, (int) columnDigit);
columnNames.add(columnName);
columnTypes.add(flinkType);
@@ -302,7 +290,14 @@ public class DorisCatalog extends AbstractCatalog {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ if(!tableExists(tablePath)){
+ if(ignoreIfNotExists){
+ return;
+ }
+ throw new TableNotExistException(getName(), tablePath);
+ }
+
+ dorisSystem.dropTable(String.format("%s.%s", tablePath.getDatabaseName(), tablePath.getObjectName()));
}
@Override
@@ -314,7 +309,56 @@ public class DorisCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(tablePath, "tablePath cannot be null");
+ checkNotNull(table, "table cannot be null");
+
+ if(!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+ }
+ if(tableExists(tablePath)){
+ if(ignoreIfExists){
+ return;
+ }
+ throw new TableAlreadyExistException(getName(), tablePath);
+ }
+
+ Map<String, String> options = table.getOptions();
+ if (!IDENTIFIER.equals(options.get(CONNECTOR.key()))) {
+ return;
+ }
+
+ List<String> primaryKeys = getCreateDorisKeys(table.getSchema());
+ TableSchema schema = new TableSchema();
+ schema.setDatabase(tablePath.getDatabaseName());
+ schema.setTable(tablePath.getObjectName());
+ schema.setTableComment(table.getComment());
+ schema.setFields(getCreateDorisColumns(table.getSchema()));
+ schema.setKeys(primaryKeys);
+ schema.setModel(DataModel.UNIQUE);
+ schema.setDistributeKeys(primaryKeys);
+ schema.setProperties(getCreateTableProps(options));
+
+ dorisSystem.createTable(schema);
+ }
+
+ public List<String> getCreateDorisKeys(org.apache.flink.table.api.TableSchema schema){
+ Preconditions.checkState(schema.getPrimaryKey().isPresent(),"primary key cannot be null");
+ return schema.getPrimaryKey().get().getColumns();
+ }
+
+ public Map<String, FieldSchema> getCreateDorisColumns(org.apache.flink.table.api.TableSchema schema){
+ String[] fieldNames = schema.getFieldNames();
+ DataType[] fieldTypes = schema.getFieldDataTypes();
+
+ Map<String, FieldSchema> fields = new LinkedHashMap<>();
+ for (int i = 0; i < fieldNames.length; i++) {
+ fields.put(fieldNames[i],
+ new FieldSchema(
+ fieldNames[i],
+ DorisTypeMapper.toDorisType(fieldTypes[i]),
+ null));
+ }
+ return fields;
}
@Override
@@ -490,37 +534,4 @@ public class DorisCatalog extends AbstractCatalog {
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
-
-
- private List<String> extractColumnValuesBySQL(
- String connUrl,
- String sql,
- int columnIndex,
- Predicate<String> filterFunc,
- Object... params) {
-
- List<String> columnValues = Lists.newArrayList();
-
- try (Connection conn = DriverManager.getConnection(connUrl, username, password);
- PreparedStatement ps = conn.prepareStatement(sql)) {
- if (Objects.nonNull(params) && params.length > 0) {
- for (int i = 0; i < params.length; i++) {
- ps.setObject(i + 1, params[i]);
- }
- }
- ResultSet rs = ps.executeQuery();
- while (rs.next()) {
- String columnValue = rs.getString(columnIndex);
- if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
- columnValues.add(columnValue);
- }
- }
- return columnValues;
- } catch (Exception e) {
- throw new CatalogException(
- String.format(
- "The following SQL query could not be executed (%s): %s", connUrl, sql),
- e);
- }
- }
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index 9a80370..958ce3d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -16,6 +16,7 @@
// under the License.
package org.apache.doris.flink.catalog;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.Catalog;
@@ -26,7 +27,7 @@ import java.util.HashSet;
import java.util.Set;
import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
-import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
@@ -67,7 +68,7 @@ public class DorisCatalogFactory implements CatalogFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(JDBCURL);
+ options.add(JDBC_URL);
options.add(USERNAME);
options.add(PASSWORD);
return options;
@@ -76,7 +77,7 @@ public class DorisCatalogFactory implements CatalogFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(JDBCURL);
+ options.add(JDBC_URL);
options.add(DEFAULT_DATABASE);
options.add(FENODES);
@@ -115,12 +116,17 @@ public class DorisCatalogFactory implements CatalogFactory {
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+ DorisConnectionOptions connectionOptions =
+ new DorisConnectionOptions.DorisConnectionOptionsBuilder()
+ .withFenodes(helper.getOptions().get(FENODES))
+ .withJdbcUrl(helper.getOptions().get(JDBC_URL))
+ .withUsername(helper.getOptions().get(USERNAME))
+ .withPassword(helper.getOptions().get(PASSWORD))
+ .build();
return new DorisCatalog(
context.getName(),
- helper.getOptions().get(JDBCURL),
+ connectionOptions,
helper.getOptions().get(DEFAULT_DATABASE),
- helper.getOptions().get(USERNAME),
- helper.getOptions().get(PASSWORD),
((Configuration) helper.getOptions()).toMap());
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
index ff87f9b..cc5772e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
@@ -20,7 +20,23 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.catalog.CommonCatalogOptions;
+import java.util.HashMap;
+import java.util.Map;
+
public class DorisCatalogOptions {
- public static final ConfigOption<String> JDBCURL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url.");
public static final ConfigOption<String> DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY).stringType().noDefaultValue();
+
+ public static final String TABLE_PROPERTIES_PREFIX = "table.properties.";
+
+ public static Map<String, String> getCreateTableProps(Map<String, String> tableOptions) {
+ final Map<String, String> tableProps = new HashMap<>();
+
+ for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+ if (entry.getKey().startsWith(TABLE_PROPERTIES_PREFIX)) {
+ String subKey = entry.getKey().substring(TABLE_PROPERTIES_PREFIX.length());
+ tableProps.put(subKey, entry.getValue());
+ }
+ }
+ return tableProps;
+ }
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index 372b9b2..09bb492 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -16,83 +16,87 @@
// under the License.
package org.apache.doris.flink.catalog;
+import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
-public class DorisTypeMapper {
-
- // -------------------------number----------------------------
- private static final String DORIS_TINYINT = "TINYINT";
- private static final String DORIS_SMALLINT = "SMALLINT";
- private static final String DORIS_INT = "INT";
- private static final String DORIS_BIGINT = "BIGINT";
- private static final String DORIS_LARGEINT = "BIGINT UNSIGNED";
- private static final String DORIS_DECIMAL = "DECIMAL";
- private static final String DORIS_DECIMALV2 = "DECIMALV2";
- private static final String DORIS_DECIMAL32 = "DECIMAL32";
- private static final String DORIS_DECIMAL64 = "DECIMAL64";
- private static final String DORIS_DECIMAL128I = "DECIMAL128I";
- private static final String DORIS_FLOAT = "FLOAT";
- private static final String DORIS_DOUBLE = "DOUBLE";
-
- // -------------------------string----------------------------
- private static final String DORIS_CHAR = "CHAR";
- private static final String DORIS_VARCHAR = "VARCHAR";
- private static final String DORIS_STRING = "STRING";
- private static final String DORIS_TEXT = "TEXT";
- private static final String DORIS_JSONB = "JSONB";
-
- // ------------------------------time-------------------------
- private static final String DORIS_DATE = "DATE";
- private static final String DORIS_DATEV2 = "DATEV2";
- private static final String DORIS_DATETIME = "DATETIME";
- private static final String DORIS_DATETIMEV2 = "DATETIMEV2";
-
- //------------------------------bool------------------------
- private static final String DORIS_BOOLEAN = "BOOLEAN";
+import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN;
+import static org.apache.doris.flink.catalog.doris.DorisType.CHAR;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATE;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATETIME;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATETIME_V2;
+import static org.apache.doris.flink.catalog.doris.DorisType.DATE_V2;
+import static org.apache.doris.flink.catalog.doris.DorisType.DECIMAL;
+import static org.apache.doris.flink.catalog.doris.DorisType.DECIMAL_V3;
+import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
+import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
+import static org.apache.doris.flink.catalog.doris.DorisType.INT;
+import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
+import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.SMALLINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.STRING;
+import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
+public class DorisTypeMapper {
public static DataType toFlinkType(String columnName, String columnType, int precision, int scale) {
columnType = columnType.toUpperCase();
switch (columnType) {
- case DORIS_BOOLEAN:
+ case BOOLEAN:
return DataTypes.BOOLEAN();
- case DORIS_TINYINT:
+ case TINYINT:
if (precision == 0) {
//The boolean type will become tinyint when queried in information_schema, and precision=0
return DataTypes.BOOLEAN();
} else {
return DataTypes.TINYINT();
}
- case DORIS_SMALLINT:
+ case SMALLINT:
return DataTypes.SMALLINT();
- case DORIS_INT:
+ case INT:
return DataTypes.INT();
- case DORIS_BIGINT:
+ case BIGINT:
return DataTypes.BIGINT();
- case DORIS_DECIMAL:
- case DORIS_DECIMALV2:
- case DORIS_DECIMAL32:
- case DORIS_DECIMAL64:
- case DORIS_DECIMAL128I:
+ case DECIMAL:
+ case DECIMAL_V3:
return DataTypes.DECIMAL(precision, scale);
- case DORIS_FLOAT:
+ case FLOAT:
return DataTypes.FLOAT();
- case DORIS_DOUBLE:
+ case DOUBLE:
return DataTypes.DOUBLE();
- case DORIS_CHAR:
+ case CHAR:
return DataTypes.CHAR(precision);
- case DORIS_LARGEINT:
- case DORIS_VARCHAR:
- case DORIS_STRING:
- case DORIS_TEXT:
- case DORIS_JSONB:
+ case VARCHAR:
+ return DataTypes.VARCHAR(precision);
+ case LARGEINT:
+ case STRING:
+ case JSONB:
return DataTypes.STRING();
- case DORIS_DATE:
- case DORIS_DATEV2:
+ case DATE:
+ case DATE_V2:
return DataTypes.DATE();
- case DORIS_DATETIME:
- case DORIS_DATETIMEV2:
+ case DATETIME:
+ case DATETIME_V2:
return DataTypes.TIMESTAMP(0);
default:
throw new UnsupportedOperationException(
@@ -100,4 +104,112 @@ public class DorisTypeMapper {
"Doesn't support Doris type '%s' on column '%s'", columnType, columnName));
}
}
+
+ public static String toDorisType(DataType flinkType){
+ LogicalType logicalType = flinkType.getLogicalType();
+ return logicalType.accept(new LogicalTypeVisitor(logicalType));
+ }
+
+ private static class LogicalTypeVisitor extends LogicalTypeDefaultVisitor<String> {
+ private final LogicalType type;
+
+ LogicalTypeVisitor(LogicalType type) {
+ this.type = type;
+ }
+
+ @Override
+ public String visit(CharType charType) {
+ return String.format("%s(%s)", DorisType.CHAR, charType.getLength());
+ }
+
+ @Override
+ public String visit(VarCharType varCharType) {
+ int length = varCharType.getLength();
+ return length > 65533 ? STRING : String.format("%s(%s)", VARCHAR, length);
+ }
+
+ @Override
+ public String visit(BooleanType booleanType) {
+ return BOOLEAN;
+ }
+
+ @Override
+ public String visit(VarBinaryType varBinaryType) {
+ return STRING;
+ }
+
+ @Override
+ public String visit(DecimalType decimalType) {
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ return precision <= 38
+ ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ }
+
+ @Override
+ public String visit(TinyIntType tinyIntType) {
+ return TINYINT;
+ }
+
+ @Override
+ public String visit(SmallIntType smallIntType) {
+ return SMALLINT;
+ }
+
+ @Override
+ public String visit(IntType intType) {
+ return INT;
+ }
+
+ @Override
+ public String visit(BigIntType bigIntType) {
+ return BIGINT;
+ }
+
+ @Override
+ public String visit(FloatType floatType) {
+ return FLOAT;
+ }
+
+ @Override
+ public String visit(DoubleType doubleType) {
+ return DOUBLE;
+ }
+
+ @Override
+ public String visit(DateType dateType) {
+ return DATE_V2;
+ }
+
+ @Override
+ public String visit(TimestampType timestampType) {
+ int precision = timestampType.getPrecision();
+ return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(Math.max(precision, 0), 6));
+ }
+
+ @Override
+ public String visit(ArrayType arrayType) {
+ return STRING;
+ }
+
+ @Override
+ public String visit(MapType mapType) {
+ return STRING;
+ }
+
+ @Override
+ public String visit(RowType rowType) {
+ return STRING;
+ }
+
+ @Override
+ protected String defaultMethod(LogicalType logicalType) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Flink doesn't support converting type %s to Doris type yet.",
+ type.toString()));
+ }
+ }
+
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 36aba1c..0a0f5e9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -23,14 +23,13 @@ import org.apache.doris.flink.connection.JdbcConnectionProvider;
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.DorisSystemException;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.flink.annotation.Public;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
@@ -56,33 +55,34 @@ public class DorisSystem {
this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
}
- public List<String> listDatabases() throws Exception {
+ public List<String> listDatabases() {
return extractColumnValuesBySQL(
"SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
1,
dbName -> !builtinDatabases.contains(dbName));
}
- public boolean databaseExists(String database) throws Exception {
+ public boolean databaseExists(String database) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
return listDatabases().contains(database);
}
- public boolean createDatabase(String database) throws Exception {
- execute(String.format("CREATE DATABASE %s", database));
+ public boolean createDatabase(String database) {
+ execute(String.format("CREATE DATABASE IF NOT EXISTS %s", database));
+ return true;
+ }
+
+ public boolean dropDatabase(String database) {
+ execute(String.format("DROP DATABASE IF EXISTS %s", database));
return true;
}
public boolean tableExists(String database, String table){
- try {
- return databaseExists(database)
- && listTables(database).contains(table);
- } catch (Exception e) {
- return false;
- }
+ return databaseExists(database)
+ && listTables(database).contains(table);
}
- public List<String> listTables(String databaseName) throws Exception {
+ public List<String> listTables(String databaseName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
}
@@ -93,29 +93,32 @@ public class DorisSystem {
databaseName);
}
- public void createTable(TableSchema schema) throws Exception {
+ public void dropTable(String tableName) {
+ execute(String.format("DROP TABLE IF EXISTS %s", tableName));
+ }
+
+ public void createTable(TableSchema schema) {
String ddl = buildCreateTableDDL(schema);
LOG.info("Create table with ddl:{}", ddl);
execute(ddl);
}
- public void execute(String sql) throws Exception {
- Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
- try (Statement statement = conn.createStatement()) {
+ public void execute(String sql) {
+ try (Statement statement = jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
statement.execute(sql);
+ } catch (Exception e){
+ throw new DorisSystemException(String.format("SQL query could not be executed: %s", sql), e);
}
}
- private List<String> extractColumnValuesBySQL(
+ public List<String> extractColumnValuesBySQL(
String sql,
int columnIndex,
Predicate<String> filterFunc,
- Object... params) throws Exception {
+ Object... params) {
- Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
List<String> columnValues = Lists.newArrayList();
-
- try (PreparedStatement ps = conn.prepareStatement(sql)) {
+ try (PreparedStatement ps = jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
@@ -130,7 +133,7 @@ public class DorisSystem {
}
return columnValues;
} catch (Exception e) {
- throw new CatalogException(
+ throw new DorisSystemException(
String.format(
"The following SQL query could not be executed: %s", sql),
e);
@@ -210,7 +213,7 @@ public class DorisSystem {
.append(" ")
.append(field.getTypeString())
.append(" COMMENT '")
- .append(field.getComment())
+ .append(field.getComment() == null ? "" : field.getComment())
.append("',");
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSystemException.java
similarity index 52%
copy from flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
copy to flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSystemException.java
index ff87f9b..eec2922 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSystemException.java
@@ -14,13 +14,32 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.catalog;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
+package org.apache.doris.flink.exception;
-public class DorisCatalogOptions {
- public static final ConfigOption<String> JDBCURL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url.");
- public static final ConfigOption<String> DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY).stringType().noDefaultValue();
+/**
+ * Doris System run exception.
+ */
+public class DorisSystemException extends RuntimeException {
+ public DorisSystemException() {
+ super();
+ }
+
+ public DorisSystemException(String message) {
+ super(message);
+ }
+
+ public DorisSystemException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DorisSystemException(Throwable cause) {
+ super(cause);
+ }
+
+ protected DorisSystemException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
index e1452d3..6edfd50 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
@@ -17,15 +17,24 @@
package org.apache.doris.flink.catalog;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
@@ -52,10 +61,10 @@ import static org.junit.Assert.assertTrue;
public class CatalogTest {
private static final String TEST_CATALOG_NAME = "doris_catalog";
private static final String TEST_FENODES = "127.0.0.1:8030";
- private static final String TEST_JDBCURL = "jdbc:mysql://127.0.0.1.78:9030";
+ private static final String TEST_JDBCURL = "jdbc:mysql://127.0.0.1:9030";
private static final String TEST_USERNAME = "root";
private static final String TEST_PWD = "";
- private static final String TEST_DB = "test";
+ private static final String TEST_DB = "db1";
private static final String TEST_TABLE = "t_all_types";
private static final String TEST_TABLE_SINK = "t_all_types_sink";
private static final String TEST_TABLE_SINK_GROUPBY = "t_all_types_sink_groupby";
@@ -78,6 +87,25 @@ public class CatalogTest {
.column("c_tinyint", DataTypes.TINYINT())
.build();
+ protected static final TableSchema TABLE_SCHEMA_1 =
+ TableSchema.builder()
+ .field("id", new AtomicDataType(new VarCharType(false,128)))
+ .field("c_boolean", DataTypes.BOOLEAN())
+ .field("c_char", DataTypes.CHAR(1))
+ .field("c_date", DataTypes.DATE())
+ .field("c_datetime", DataTypes.TIMESTAMP(0))
+ .field("c_decimal", DataTypes.DECIMAL(10, 2))
+ .field("c_double", DataTypes.DOUBLE())
+ .field("c_float", DataTypes.FLOAT())
+ .field("c_int", DataTypes.INT())
+ .field("c_bigint", DataTypes.BIGINT())
+ .field("c_largeint", DataTypes.STRING())
+ .field("c_smallint", DataTypes.SMALLINT())
+ .field("c_string", DataTypes.STRING())
+ .field("c_tinyint", DataTypes.TINYINT())
+ .primaryKey("id")
+ .build();
+
private static final List<Row> ALL_TYPES_ROWS =
Lists.newArrayList(
Row.ofKind(
@@ -118,9 +146,17 @@ public class CatalogTest {
@Before
public void setup() {
+ DorisConnectionOptions connectionOptions =
+ new DorisConnectionOptions.DorisConnectionOptionsBuilder()
+ .withFenodes(TEST_FENODES)
+ .withJdbcUrl(TEST_JDBCURL)
+ .withUsername(TEST_USERNAME)
+ .withPassword(TEST_PWD)
+ .build();
+
Map<String,String> props = new HashMap<>();
props.put("sink.enable-2pc","false");
- catalog = new DorisCatalog(TEST_CATALOG_NAME, TEST_JDBCURL, TEST_DB, TEST_USERNAME, TEST_PWD, props);
+ catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, TEST_DB, props);
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
// Use doris catalog.
@@ -147,6 +183,18 @@ public class CatalogTest {
assertTrue(catalog.databaseExists(TEST_DB));
}
+ @Test
+ public void testCreateDb() throws Exception {
+ catalog.createDatabase("db1",createDb(), true);
+ assertTrue(catalog.databaseExists("db1"));
+ }
+
+ @Test
+ public void testDropDb() throws Exception {
+ catalog.dropDatabase("db1",false);
+ assertFalse(catalog.databaseExists("db1"));
+ }
+
@Test
public void testListTables() throws DatabaseNotExistException {
List<String> actual = catalog.listTables(TEST_DB);
@@ -165,12 +213,34 @@ public class CatalogTest {
}
@Test
+ @Ignore
public void testGetTable() throws TableNotExistException {
+ //todo: string varchar mapping
CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE));
System.out.println(table);
assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
}
+ @Test
+ @Ignore
+ public void testCreateTable() throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException {
+ //todo: Record primary key not null information
+ ObjectPath tablePath = new ObjectPath(TEST_DB, TEST_TABLE);
+ catalog.dropTable(tablePath, true);
+ catalog.createTable(tablePath, createTable(), true);
+ CatalogBaseTable tableGet = catalog.getTable(tablePath);
+ System.out.println(tableGet.getUnresolvedSchema());
+ System.out.println(TABLE_SCHEMA_1);
+ assertEquals(TABLE_SCHEMA_1, tableGet.getUnresolvedSchema());
+ }
+
+ @Test
+ public void testDropTable() throws TableNotExistException {
+ catalog.dropTable(new ObjectPath("db1", "tbl1"), true);
+ assertFalse(catalog.tableExists(new ObjectPath("db1", "tbl1")));
+ }
+
+
// ------ test select query. ------
@Test
@@ -260,4 +330,26 @@ public class CatalogTest {
.collect());
assertEquals(Lists.newArrayList(Row.ofKind(RowKind.INSERT, "catalog","100002")), results);
}
+
+ private static CatalogDatabase createDb() {
+ return new CatalogDatabaseImpl(
+ new HashMap<String, String>() {
+ {
+ put("k1", "v1");
+ }
+ },
+ "");
+ }
+
+ private static CatalogTable createTable() {
+ return new CatalogTableImpl(
+ TABLE_SCHEMA_1,
+ new HashMap<String, String>() {
+ {
+ put("connector", "doris");
+ put("table.properties.replication_num", "1");
+ }
+ },
+ "FlinkTable");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org