You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2024/04/12 03:31:03 UTC
(doris) 19/19: [feature](iceberg)The new DDL syntax is added to create iceberg partitioned tables (#33338)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
commit d31bca199f0a4f66de1ac2400f86c8c58a2ac0af
Author: wuwenchi <wu...@hotmail.com>
AuthorDate: Wed Apr 10 20:39:53 2024 +0800
[feature](iceberg)The new DDL syntax is added to create iceberg partitioned tables (#33338)
support partition by :
```
create table tb1 (c1 string, ts datetime) engine = iceberg partition by (c1, day(ts)) () properties ("a"="b")
```
---
.../datasource/iceberg/DorisTypeToIcebergType.java | 14 +-
.../datasource/iceberg/IcebergMetadataOps.java | 2 +-
.../doris/datasource/iceberg/IcebergUtils.java | 94 +++++-----
.../trees/plans/commands/info/CreateTableInfo.java | 4 +
.../datasource/iceberg/CreateIcebergTableTest.java | 196 +++++++++++++++++++++
5 files changed, 247 insertions(+), 63 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java
index d6370c583da..52e4b6cf17a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java
@@ -102,27 +102,19 @@ public class DorisTypeToIcebergType extends DorisTypeVisitor<Type> {
PrimitiveType primitiveType = atomic.getPrimitiveType();
if (primitiveType.equals(PrimitiveType.BOOLEAN)) {
return Types.BooleanType.get();
- } else if (primitiveType.equals(PrimitiveType.TINYINT)
- || primitiveType.equals(PrimitiveType.SMALLINT)
- || primitiveType.equals(PrimitiveType.INT)) {
+ } else if (primitiveType.equals(PrimitiveType.INT)) {
return Types.IntegerType.get();
- } else if (primitiveType.equals(PrimitiveType.BIGINT)
- || primitiveType.equals(PrimitiveType.LARGEINT)) {
+ } else if (primitiveType.equals(PrimitiveType.BIGINT)) {
return Types.LongType.get();
} else if (primitiveType.equals(PrimitiveType.FLOAT)) {
return Types.FloatType.get();
} else if (primitiveType.equals(PrimitiveType.DOUBLE)) {
return Types.DoubleType.get();
- } else if (primitiveType.equals(PrimitiveType.CHAR)
- || primitiveType.equals(PrimitiveType.VARCHAR)
- || primitiveType.equals(PrimitiveType.STRING)) {
+ } else if (primitiveType.equals(PrimitiveType.STRING)) {
return Types.StringType.get();
} else if (primitiveType.equals(PrimitiveType.DATE)
|| primitiveType.equals(PrimitiveType.DATEV2)) {
return Types.DateType.get();
- } else if (primitiveType.equals(PrimitiveType.TIME)
- || primitiveType.equals(PrimitiveType.TIMEV2)) {
- return Types.TimeType.get();
} else if (primitiveType.equals(PrimitiveType.DECIMALV2)
|| primitiveType.isDecimalV3Type()) {
return Types.DecimalType.of(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 0c188fae301..18efd7f1b7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -149,7 +149,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
Schema schema = new Schema(visit.asNestedType().asStructType().fields());
Map<String, String> properties = stmt.getProperties();
properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
- PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(properties, schema);
+ PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties);
db.setUnInitialized(true);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 08c4be4ceac..70c384a0c4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -25,10 +25,12 @@ import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
@@ -63,8 +65,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Iceberg utils
@@ -78,7 +78,6 @@ public class IcebergUtils {
}
};
static long MILLIS_TO_NANO_TIME = 1000;
- private static final Pattern PARTITION_REG = Pattern.compile("(\\w+)\\((\\d+)?,?(\\w+)\\)");
// https://iceberg.apache.org/spec/#schemas-and-data-types
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
@@ -415,57 +414,51 @@ public class IcebergUtils {
return slotRef;
}
- // "partition"="c1;day(c1);bucket(4,c3)"
- public static PartitionSpec solveIcebergPartitionSpec(Map<String, String> properties, Schema schema)
+ public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDesc, Schema schema)
throws UserException {
- if (properties.containsKey("partition")) {
- PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
- String par = properties.get("partition").replaceAll(" ", "");
- String[] pars = par.split(";");
- for (String func : pars) {
- if (func.contains("(")) {
- Matcher matcher = PARTITION_REG.matcher(func);
- if (matcher.matches()) {
- switch (matcher.group(1).toLowerCase()) {
- case "bucket":
- builder.bucket(matcher.group(3), Integer.parseInt(matcher.group(2)));
- break;
- case "year":
- case "years":
- builder.year(matcher.group(3));
- break;
- case "month":
- case "months":
- builder.month(matcher.group(3));
- break;
- case "date":
- case "day":
- case "days":
- builder.day(matcher.group(3));
- break;
- case "date_hour":
- case "hour":
- case "hours":
- builder.hour(matcher.group(3));
- break;
- case "truncate":
- builder.truncate(matcher.group(3), Integer.parseInt(matcher.group(2)));
- break;
- default:
- throw new UserException("unsupported partition for " + matcher.group(1));
- }
- } else {
- throw new UserException("failed to get partition info from " + func);
- }
- } else {
- builder.identity(func);
+ if (partitionDesc == null) {
+ return PartitionSpec.unpartitioned();
+ }
+
+ ArrayList<Expr> partitionExprs = partitionDesc.getPartitionExprs();
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ for (Expr expr : partitionExprs) {
+ if (expr instanceof SlotRef) {
+ builder.identity(((SlotRef) expr).getColumnName());
+ } else if (expr instanceof FunctionCallExpr) {
+ String exprName = expr.getExprName();
+ List<Expr> params = ((FunctionCallExpr) expr).getParams().exprs();
+ switch (exprName.toLowerCase()) {
+ case "bucket":
+ builder.bucket(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
+ break;
+ case "year":
+ case "years":
+ builder.year(params.get(0).getExprName());
+ break;
+ case "month":
+ case "months":
+ builder.month(params.get(0).getExprName());
+ break;
+ case "date":
+ case "day":
+ case "days":
+ builder.day(params.get(0).getExprName());
+ break;
+ case "date_hour":
+ case "hour":
+ case "hours":
+ builder.hour(params.get(0).getExprName());
+ break;
+ case "truncate":
+ builder.truncate(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
+ break;
+ default:
+ throw new UserException("unsupported partition for " + exprName);
}
}
- properties.remove("partition");
- return builder.build();
- } else {
- return PartitionSpec.unpartitioned();
}
+ return builder.build();
}
private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
@@ -567,5 +560,4 @@ public class IcebergUtils {
}
return -1;
}
-
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 0fc746da4e7..4a5a547f022 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -773,4 +773,8 @@ public class CreateTableInfo {
partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties,
comment, addRollups, null);
}
+
+ public void setIsExternal(boolean isExternal) {
+ this.isExternal = isExternal;
+ }
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
new file mode 100644
index 00000000000..863607af9d7
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.analysis.CreateCatalogStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DbName;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogFactory;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+public class CreateIcebergTableTest {
+
+ public static String warehouse;
+ public static IcebergHadoopExternalCatalog icebergCatalog;
+ public static IcebergMetadataOps ops;
+ public static String dbName = "testdb";
+ public static ConnectContext connectContext;
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ Path warehousePath = Files.createTempDirectory("test_warehouse_");
+ warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
+
+ HashMap<String, String> param = new HashMap<>();
+ param.put("type", "iceberg");
+ param.put("iceberg.catalog.type", "hadoop");
+ param.put("warehouse", warehouse);
+
+ // create catalog
+ CreateCatalogStmt createCatalogStmt = new CreateCatalogStmt(true, "iceberg", "", param, "comment");
+ icebergCatalog = (IcebergHadoopExternalCatalog) CatalogFactory.createFromStmt(1, createCatalogStmt);
+ icebergCatalog.setInitialized(true);
+
+ // create db
+ ops = new IcebergMetadataOps(icebergCatalog, icebergCatalog.getCatalog());
+ CreateDbStmt createDbStmt = new CreateDbStmt(true, new DbName("iceberg", dbName), null);
+ ops.createDb(createDbStmt);
+ icebergCatalog.setInitialized(true);
+ IcebergExternalDatabase db = new IcebergExternalDatabase(icebergCatalog, 1L, dbName);
+ icebergCatalog.addDatabaseForTest(db);
+
+ // context
+ connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ }
+
+ @Test
+ public void testSimpleTable() throws UserException {
+ TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+ String sql = "create table " + tb + " (id int) engine = iceberg";
+ createTable(sql);
+ Table table = ops.getCatalog().loadTable(tb);
+ Schema schema = table.schema();
+ Assert.assertEquals(1, schema.columns().size());
+ Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec());
+ }
+
+ @Test
+ public void testProperties() throws UserException {
+ TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+ String sql = "create table " + tb + " (id int) engine = iceberg properties(\"a\"=\"b\")";
+ createTable(sql);
+ Table table = ops.getCatalog().loadTable(tb);
+ Schema schema = table.schema();
+ Assert.assertEquals(1, schema.columns().size());
+ Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec());
+ Assert.assertEquals("b", table.properties().get("a"));
+ }
+
+ @Test
+ public void testType() throws UserException {
+ TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+ String sql = "create table " + tb + " ("
+ + "c0 int, "
+ + "c1 bigint, "
+ + "c2 float, "
+ + "c3 double, "
+ + "c4 string, "
+ + "c5 date, "
+ + "c6 decimal(20, 10), "
+ + "c7 datetime"
+ + ") engine = iceberg "
+ + "properties(\"a\"=\"b\")";
+ createTable(sql);
+ Table table = ops.getCatalog().loadTable(tb);
+ Schema schema = table.schema();
+ List<Types.NestedField> columns = schema.columns();
+ Assert.assertEquals(8, columns.size());
+ Assert.assertEquals(Type.TypeID.INTEGER, columns.get(0).type().typeId());
+ Assert.assertEquals(Type.TypeID.LONG, columns.get(1).type().typeId());
+ Assert.assertEquals(Type.TypeID.FLOAT, columns.get(2).type().typeId());
+ Assert.assertEquals(Type.TypeID.DOUBLE, columns.get(3).type().typeId());
+ Assert.assertEquals(Type.TypeID.STRING, columns.get(4).type().typeId());
+ Assert.assertEquals(Type.TypeID.DATE, columns.get(5).type().typeId());
+ Assert.assertEquals(Type.TypeID.DECIMAL, columns.get(6).type().typeId());
+ Assert.assertEquals(Type.TypeID.TIMESTAMP, columns.get(7).type().typeId());
+ }
+
+ @Test
+ public void testPartition() throws UserException {
+ TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+ String sql = "create table " + tb + " ("
+ + "id int, "
+ + "ts1 datetime, "
+ + "ts2 datetime, "
+ + "ts3 datetime, "
+ + "ts4 datetime, "
+ + "dt1 date, "
+ + "dt2 date, "
+ + "dt3 date, "
+ + "s string"
+ + ") engine = iceberg "
+ + "partition by ("
+ + "id, "
+ + "bucket(2, id), "
+ + "year(ts1), "
+ + "year(dt1), "
+ + "month(ts2), "
+ + "month(dt2), "
+ + "day(ts3), "
+ + "day(dt3), "
+ + "hour(ts4), "
+ + "truncate(10, s)) ()"
+ + "properties(\"a\"=\"b\")";
+ createTable(sql);
+ Table table = ops.getCatalog().loadTable(tb);
+ Schema schema = table.schema();
+ Assert.assertEquals(9, schema.columns().size());
+ PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .identity("id")
+ .bucket("id", 2)
+ .year("ts1")
+ .year("dt1")
+ .month("ts2")
+ .month("dt2")
+ .day("ts3")
+ .day("dt3")
+ .hour("ts4")
+ .truncate("s", 10)
+ .build();
+ Assert.assertEquals(spec, table.spec());
+ Assert.assertEquals("b", table.properties().get("a"));
+ }
+
+ public void createTable(String sql) throws UserException {
+ LogicalPlan plan = new NereidsParser().parseSingle(sql);
+ Assertions.assertTrue(plan instanceof CreateTableCommand);
+ CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo();
+ createTableInfo.setIsExternal(true);
+ CreateTableStmt createTableStmt = createTableInfo.translateToLegacyStmt();
+ ops.createTable(createTableStmt);
+ }
+
+ public String getTableName() {
+ String s = "test_tb_" + UUID.randomUUID();
+ return s.replaceAll("-", "");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org