You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2024/04/12 03:31:03 UTC

(doris) 19/19: [feature](iceberg)The new DDL syntax is added to create iceberg partitioned tables (#33338)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d31bca199f0a4f66de1ac2400f86c8c58a2ac0af
Author: wuwenchi <wu...@hotmail.com>
AuthorDate: Wed Apr 10 20:39:53 2024 +0800

    [feature](iceberg)The new DDL syntax is added to create iceberg partitioned tables (#33338)
    
    support partition by :
    
    ```
    create table tb1 (c1 string, ts datetime) engine = iceberg partition by (c1, day(ts)) () properties ("a"="b")
    ```
---
 .../datasource/iceberg/DorisTypeToIcebergType.java |  14 +-
 .../datasource/iceberg/IcebergMetadataOps.java     |   2 +-
 .../doris/datasource/iceberg/IcebergUtils.java     |  94 +++++-----
 .../trees/plans/commands/info/CreateTableInfo.java |   4 +
 .../datasource/iceberg/CreateIcebergTableTest.java | 196 +++++++++++++++++++++
 5 files changed, 247 insertions(+), 63 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java
index d6370c583da..52e4b6cf17a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java
@@ -102,27 +102,19 @@ public class DorisTypeToIcebergType extends DorisTypeVisitor<Type> {
         PrimitiveType primitiveType = atomic.getPrimitiveType();
         if (primitiveType.equals(PrimitiveType.BOOLEAN)) {
             return Types.BooleanType.get();
-        } else if (primitiveType.equals(PrimitiveType.TINYINT)
-                || primitiveType.equals(PrimitiveType.SMALLINT)
-                || primitiveType.equals(PrimitiveType.INT)) {
+        } else if (primitiveType.equals(PrimitiveType.INT)) {
             return Types.IntegerType.get();
-        } else if (primitiveType.equals(PrimitiveType.BIGINT)
-                || primitiveType.equals(PrimitiveType.LARGEINT)) {
+        } else if (primitiveType.equals(PrimitiveType.BIGINT)) {
             return Types.LongType.get();
         } else if (primitiveType.equals(PrimitiveType.FLOAT)) {
             return Types.FloatType.get();
         } else if (primitiveType.equals(PrimitiveType.DOUBLE)) {
             return Types.DoubleType.get();
-        } else if (primitiveType.equals(PrimitiveType.CHAR)
-                || primitiveType.equals(PrimitiveType.VARCHAR)
-                || primitiveType.equals(PrimitiveType.STRING)) {
+        } else if (primitiveType.equals(PrimitiveType.STRING)) {
             return Types.StringType.get();
         } else if (primitiveType.equals(PrimitiveType.DATE)
                 || primitiveType.equals(PrimitiveType.DATEV2)) {
             return Types.DateType.get();
-        } else if (primitiveType.equals(PrimitiveType.TIME)
-                || primitiveType.equals(PrimitiveType.TIMEV2)) {
-            return Types.TimeType.get();
         } else if (primitiveType.equals(PrimitiveType.DECIMALV2)
                 || primitiveType.isDecimalV3Type()) {
             return Types.DecimalType.of(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 0c188fae301..18efd7f1b7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -149,7 +149,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
         Schema schema = new Schema(visit.asNestedType().asStructType().fields());
         Map<String, String> properties = stmt.getProperties();
         properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
-        PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(properties, schema);
+        PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
         catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties);
         db.setUnInitialized(true);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 08c4be4ceac..70c384a0c4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -25,10 +25,12 @@ import org.apache.doris.analysis.DateLiteral;
 import org.apache.doris.analysis.DecimalLiteral;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.InPredicate;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.Subquery;
@@ -63,8 +65,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Iceberg utils
@@ -78,7 +78,6 @@ public class IcebergUtils {
         }
     };
     static long MILLIS_TO_NANO_TIME = 1000;
-    private static final Pattern PARTITION_REG = Pattern.compile("(\\w+)\\((\\d+)?,?(\\w+)\\)");
     // https://iceberg.apache.org/spec/#schemas-and-data-types
     // All time and timestamp values are stored with microsecond precision
     private static final int ICEBERG_DATETIME_SCALE_MS = 6;
@@ -415,57 +414,51 @@ public class IcebergUtils {
         return slotRef;
     }
 
-    // "partition"="c1;day(c1);bucket(4,c3)"
-    public static PartitionSpec solveIcebergPartitionSpec(Map<String, String> properties, Schema schema)
+    public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDesc, Schema schema)
             throws UserException {
-        if (properties.containsKey("partition")) {
-            PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
-            String par = properties.get("partition").replaceAll(" ", "");
-            String[] pars = par.split(";");
-            for (String func : pars) {
-                if (func.contains("(")) {
-                    Matcher matcher = PARTITION_REG.matcher(func);
-                    if (matcher.matches()) {
-                        switch (matcher.group(1).toLowerCase()) {
-                            case "bucket":
-                                builder.bucket(matcher.group(3), Integer.parseInt(matcher.group(2)));
-                                break;
-                            case "year":
-                            case "years":
-                                builder.year(matcher.group(3));
-                                break;
-                            case "month":
-                            case "months":
-                                builder.month(matcher.group(3));
-                                break;
-                            case "date":
-                            case "day":
-                            case "days":
-                                builder.day(matcher.group(3));
-                                break;
-                            case "date_hour":
-                            case "hour":
-                            case "hours":
-                                builder.hour(matcher.group(3));
-                                break;
-                            case "truncate":
-                                builder.truncate(matcher.group(3), Integer.parseInt(matcher.group(2)));
-                                break;
-                            default:
-                                throw new UserException("unsupported partition for " + matcher.group(1));
-                        }
-                    } else {
-                        throw new UserException("failed to get partition info from " + func);
-                    }
-                } else {
-                    builder.identity(func);
+        if (partitionDesc == null) {
+            return PartitionSpec.unpartitioned();
+        }
+
+        ArrayList<Expr> partitionExprs = partitionDesc.getPartitionExprs();
+        PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+        for (Expr expr : partitionExprs) {
+            if (expr instanceof SlotRef) {
+                builder.identity(((SlotRef) expr).getColumnName());
+            } else if (expr instanceof FunctionCallExpr) {
+                String exprName = expr.getExprName();
+                List<Expr> params = ((FunctionCallExpr) expr).getParams().exprs();
+                switch (exprName.toLowerCase()) {
+                    case "bucket":
+                        builder.bucket(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
+                        break;
+                    case "year":
+                    case "years":
+                        builder.year(params.get(0).getExprName());
+                        break;
+                    case "month":
+                    case "months":
+                        builder.month(params.get(0).getExprName());
+                        break;
+                    case "date":
+                    case "day":
+                    case "days":
+                        builder.day(params.get(0).getExprName());
+                        break;
+                    case "date_hour":
+                    case "hour":
+                    case "hours":
+                        builder.hour(params.get(0).getExprName());
+                        break;
+                    case "truncate":
+                        builder.truncate(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
+                        break;
+                    default:
+                        throw new UserException("unsupported partition for " + exprName);
                 }
             }
-            properties.remove("partition");
-            return builder.build();
-        } else {
-            return PartitionSpec.unpartitioned();
         }
+        return builder.build();
     }
 
     private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
@@ -567,5 +560,4 @@ public class IcebergUtils {
         }
         return -1;
     }
-
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 0fc746da4e7..4a5a547f022 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -773,4 +773,8 @@ public class CreateTableInfo {
                 partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties,
                 comment, addRollups, null);
     }
+
+    public void setIsExternal(boolean isExternal) {
+        this.isExternal = isExternal;
+    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
new file mode 100644
index 00000000000..863607af9d7
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.analysis.CreateCatalogStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DbName;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogFactory;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+public class CreateIcebergTableTest {
+
+    public static String warehouse;
+    public static IcebergHadoopExternalCatalog icebergCatalog;
+    public static IcebergMetadataOps ops;
+    public static String dbName = "testdb";
+    public static ConnectContext connectContext;
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        Path warehousePath = Files.createTempDirectory("test_warehouse_");
+        warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
+
+        HashMap<String, String> param = new HashMap<>();
+        param.put("type", "iceberg");
+        param.put("iceberg.catalog.type", "hadoop");
+        param.put("warehouse", warehouse);
+
+        // create catalog
+        CreateCatalogStmt createCatalogStmt = new CreateCatalogStmt(true, "iceberg", "", param, "comment");
+        icebergCatalog = (IcebergHadoopExternalCatalog) CatalogFactory.createFromStmt(1, createCatalogStmt);
+        icebergCatalog.setInitialized(true);
+
+        // create db
+        ops = new IcebergMetadataOps(icebergCatalog, icebergCatalog.getCatalog());
+        CreateDbStmt createDbStmt = new CreateDbStmt(true, new DbName("iceberg", dbName), null);
+        ops.createDb(createDbStmt);
+        icebergCatalog.setInitialized(true);
+        IcebergExternalDatabase db = new IcebergExternalDatabase(icebergCatalog, 1L, dbName);
+        icebergCatalog.addDatabaseForTest(db);
+
+        // context
+        connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+    }
+
+    @Test
+    public void testSimpleTable() throws UserException {
+        TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+        String sql = "create table " + tb + " (id int) engine = iceberg";
+        createTable(sql);
+        Table table = ops.getCatalog().loadTable(tb);
+        Schema schema = table.schema();
+        Assert.assertEquals(1, schema.columns().size());
+        Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec());
+    }
+
+    @Test
+    public void testProperties() throws UserException {
+        TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+        String sql = "create table " + tb + " (id int) engine = iceberg properties(\"a\"=\"b\")";
+        createTable(sql);
+        Table table = ops.getCatalog().loadTable(tb);
+        Schema schema = table.schema();
+        Assert.assertEquals(1, schema.columns().size());
+        Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec());
+        Assert.assertEquals("b", table.properties().get("a"));
+    }
+
+    @Test
+    public void testType() throws UserException {
+        TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+        String sql = "create table " + tb + " ("
+                + "c0 int, "
+                + "c1 bigint, "
+                + "c2 float, "
+                + "c3 double, "
+                + "c4 string, "
+                + "c5 date, "
+                + "c6 decimal(20, 10), "
+                + "c7 datetime"
+                + ") engine = iceberg "
+                + "properties(\"a\"=\"b\")";
+        createTable(sql);
+        Table table = ops.getCatalog().loadTable(tb);
+        Schema schema = table.schema();
+        List<Types.NestedField> columns = schema.columns();
+        Assert.assertEquals(8, columns.size());
+        Assert.assertEquals(Type.TypeID.INTEGER, columns.get(0).type().typeId());
+        Assert.assertEquals(Type.TypeID.LONG, columns.get(1).type().typeId());
+        Assert.assertEquals(Type.TypeID.FLOAT, columns.get(2).type().typeId());
+        Assert.assertEquals(Type.TypeID.DOUBLE, columns.get(3).type().typeId());
+        Assert.assertEquals(Type.TypeID.STRING, columns.get(4).type().typeId());
+        Assert.assertEquals(Type.TypeID.DATE, columns.get(5).type().typeId());
+        Assert.assertEquals(Type.TypeID.DECIMAL, columns.get(6).type().typeId());
+        Assert.assertEquals(Type.TypeID.TIMESTAMP, columns.get(7).type().typeId());
+    }
+
+    @Test
+    public void testPartition() throws UserException {
+        TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
+        String sql = "create table " + tb + " ("
+                + "id int, "
+                + "ts1 datetime, "
+                + "ts2 datetime, "
+                + "ts3 datetime, "
+                + "ts4 datetime, "
+                + "dt1 date, "
+                + "dt2 date, "
+                + "dt3 date, "
+                + "s string"
+                + ") engine = iceberg "
+                + "partition by ("
+                + "id, "
+                + "bucket(2, id), "
+                + "year(ts1), "
+                + "year(dt1), "
+                + "month(ts2), "
+                + "month(dt2), "
+                + "day(ts3), "
+                + "day(dt3), "
+                + "hour(ts4), "
+                + "truncate(10, s)) ()"
+                + "properties(\"a\"=\"b\")";
+        createTable(sql);
+        Table table = ops.getCatalog().loadTable(tb);
+        Schema schema = table.schema();
+        Assert.assertEquals(9, schema.columns().size());
+        PartitionSpec spec = PartitionSpec.builderFor(schema)
+                .identity("id")
+                .bucket("id", 2)
+                .year("ts1")
+                .year("dt1")
+                .month("ts2")
+                .month("dt2")
+                .day("ts3")
+                .day("dt3")
+                .hour("ts4")
+                .truncate("s", 10)
+                .build();
+        Assert.assertEquals(spec, table.spec());
+        Assert.assertEquals("b", table.properties().get("a"));
+    }
+
+    public void createTable(String sql) throws UserException {
+        LogicalPlan plan = new NereidsParser().parseSingle(sql);
+        Assertions.assertTrue(plan instanceof CreateTableCommand);
+        CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo();
+        createTableInfo.setIsExternal(true);
+        CreateTableStmt createTableStmt = createTableInfo.translateToLegacyStmt();
+        ops.createTable(createTableStmt);
+    }
+
+    public String getTableName() {
+        String s = "test_tb_" + UUID.randomUUID();
+        return s.replaceAll("-", "");
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org