You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/26 01:52:21 UTC

[doris] branch master updated: [fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4408231765 [fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414)
4408231765 is described below

commit 44082317659ccd81355029d9cc9e7caef29ca641
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Sun Jun 26 09:52:16 2022 +0800

    [fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414)
---
 .../apache/doris/alter/SchemaChangeHandler.java    | 13 ++++
 .../org/apache/doris/analysis/CreateTableStmt.java | 27 ++++----
 .../java/org/apache/doris/catalog/Catalog.java     | 15 +++++
 .../org/apache/doris/catalog/DistributionInfo.java | 33 ---------
 .../apache/doris/alter/SchemaChangeJobV2Test.java  | 78 ++++++++++++++++++++--
 .../org/apache/doris/catalog/CatalogTestUtil.java  | 14 ----
 .../org/apache/doris/catalog/CreateTableTest.java  | 30 +++++++++
 7 files changed, 145 insertions(+), 65 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 94515cfcfb..574c908803 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -48,6 +48,7 @@ import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.ReplicaAllocation;
@@ -775,6 +776,13 @@ public class SchemaChangeHandler extends AlterHandler {
                     && newColumn.getDefaultValue() != null && !newColumn.getDefaultValue().equals("0")) {
                 throw new DdlException("The default value of '"
                         + newColName + "' with SUM aggregation function must be zero");
+            } else if (olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo) {
+                if (newColumn.getAggregationType() == AggregateType.REPLACE
+                        || newColumn.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
+                    throw new DdlException("Can not add value column with aggregation type "
+                                + newColumn.getAggregationType() + " for olap table with random distribution : "
+                                + newColName);
+                }
             }
         } else if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
             if (newColumn.getAggregationType() != null) {
@@ -1499,6 +1507,11 @@ public class SchemaChangeHandler extends AlterHandler {
                         Catalog.getCurrentCatalog().modifyTableColocate(db, olapTable, colocateGroup, false, null);
                         return;
                     } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
+                        String distributionType = properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE);
+                        if (!distributionType.equalsIgnoreCase("random")) {
+                            throw new DdlException("Only support modifying distribution type of table from"
+                                + " hash to random");
+                        }
                         Catalog.getCurrentCatalog().convertDistributionType(db, olapTable);
                         return;
                     } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 56f20d2113..b85aa21b33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.PrimitiveType;
@@ -90,9 +91,6 @@ public class CreateTableStmt extends DdlStmt {
         engineNames.add("hudi");
     }
 
-    // for backup. set to -1 for normal use
-    private int tableSignature;
-
     public CreateTableStmt() {
         // for persist
         tableName = new TableName();
@@ -163,7 +161,6 @@ public class CreateTableStmt extends DdlStmt {
         this.ifNotExists = ifNotExists;
         this.comment = Strings.nullToEmpty(comment);
 
-        this.tableSignature = -1;
         this.rollupAlterClauseList = rollupAlterClauseList == null ? new ArrayList<>() : rollupAlterClauseList;
     }
 
@@ -243,14 +240,6 @@ public class CreateTableStmt extends DdlStmt {
         return tableName.getDb();
     }
 
-    public void setTableSignature(int tableSignature) {
-        this.tableSignature = tableSignature;
-    }
-
-    public int getTableSignature() {
-        return tableSignature;
-    }
-
     public void setTableName(String newTableName) {
         tableName = new TableName(tableName.getDb(), newTableName);
     }
@@ -421,6 +410,20 @@ public class CreateTableStmt extends DdlStmt {
                 throw new AnalysisException("Create olap table should contain distribution desc");
             }
             distributionDesc.analyze(columnSet, columnDefs);
+            if (distributionDesc.type == DistributionInfo.DistributionInfoType.RANDOM) {
+                if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
+                    throw new AnalysisException("Create unique keys table should not contain random distribution desc");
+                } else if (keysDesc.getKeysType() == KeysType.AGG_KEYS) {
+                    for (ColumnDef columnDef : columnDefs) {
+                        if (columnDef.getAggregateType() == AggregateType.REPLACE
+                                || columnDef.getAggregateType() == AggregateType.REPLACE_IF_NOT_NULL) {
+                            throw new AnalysisException("Create aggregate keys table with value columns of which"
+                                + " aggregate type is " + columnDef.getAggregateType() + " should not contain random"
+                                + " distribution desc");
+                        }
+                    }
+                }
+            }
         } else if (engineName.equalsIgnoreCase("elasticsearch")) {
             EsUtil.analyzePartitionAndDistributionDesc(partitionDesc, distributionDesc);
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 4156054c9d..8615350160 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4597,6 +4597,21 @@ public class Catalog {
     public void convertDistributionType(Database db, OlapTable tbl) throws DdlException {
         tbl.writeLockOrDdlException();
         try {
+            if (tbl.isColocateTable()) {
+                throw new DdlException("Cannot change distribution type of colocate table.");
+            }
+            if (tbl.getKeysType() == KeysType.UNIQUE_KEYS) {
+                throw new DdlException("Cannot change distribution type of unique keys table.");
+            }
+            if (tbl.getKeysType() == KeysType.AGG_KEYS) {
+                for (Column column : tbl.getBaseSchema()) {
+                    if (column.getAggregationType() == AggregateType.REPLACE
+                            || column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
+                        throw new DdlException("Cannot change distribution type of aggregate keys table which has value"
+                            + " columns with " + column.getAggregationType() + " type.");
+                    }
+                }
+            }
             if (!tbl.convertHashDistributionToRandomDistribution()) {
                 throw new DdlException("Table " + tbl.getName() + " is not hash distributed");
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
index 3a22f44f0f..959076f9f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
@@ -18,19 +18,15 @@
 package org.apache.doris.catalog;
 
 import org.apache.doris.analysis.DistributionDesc;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 
-import com.google.common.collect.Lists;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang.NotImplementedException;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 
 public abstract class DistributionInfo implements Writable {
 
@@ -88,33 +84,4 @@ public abstract class DistributionInfo implements Writable {
     public boolean equals(DistributionInfo info) {
         return false;
     }
-
-    public static List<Expr> toDistExpr(OlapTable tbl, DistributionInfo distInfo, Map<String, Expr> exprByCol) {
-        List<Expr> distExprs = Lists.newArrayList();
-        if (distInfo instanceof RandomDistributionInfo) {
-            for (Column col : tbl.getBaseSchema()) {
-                if (col.isKey()) {
-                    Expr distExpr = exprByCol.get(col.getName());
-                    // used to compute hash
-                    if (col.getDataType() == PrimitiveType.CHAR) {
-                        distExpr.setType(Type.CHAR);
-                    }
-                    distExprs.add(distExpr);
-                } else {
-                    break;
-                }
-            }
-        } else if (distInfo instanceof HashDistributionInfo) {
-            HashDistributionInfo hashDistInfo = (HashDistributionInfo) distInfo;
-            for (Column col : hashDistInfo.getDistributionColumns()) {
-                Expr distExpr = exprByCol.get(col.getName());
-                // used to compute hash
-                if (col.getDataType() == PrimitiveType.CHAR) {
-                    distExpr.setType(Type.CHAR);
-                }
-                distExprs.add(distExpr);
-            }
-        }
-        return distExprs;
-    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index 02771d213e..aadc893d90 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -31,11 +31,13 @@ import org.apache.doris.backup.CatalogMocker;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.FakeCatalog;
 import org.apache.doris.catalog.FakeEditLog;
+import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.MaterializedIndex.IndexState;
@@ -47,6 +49,7 @@ import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
@@ -62,7 +65,10 @@ import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.FakeTransactionIDGenerator;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -353,7 +359,7 @@ public class SchemaChangeJobV2Test {
         Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets());
     }
 
-    public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue, String missPropertyKey)
+    public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue)
             throws UserException {
         fakeCatalog = new FakeCatalog();
         FakeCatalog.setCatalog(masterCatalog);
@@ -375,11 +381,11 @@ public class SchemaChangeJobV2Test {
 
     @Test
     public void testModifyDynamicPartitionWithoutTableProperty() throws UserException {
-        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false", DynamicPartitionProperty.TIME_UNIT);
-        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day", DynamicPartitionProperty.ENABLE);
-        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3", DynamicPartitionProperty.ENABLE);
-        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE);
-        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE);
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false");
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day");
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3");
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p");
+        modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30");
     }
 
     @Test
@@ -433,4 +439,64 @@ public class SchemaChangeJobV2Test {
         Partition partition1 = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
         Assert.assertTrue(partition1.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM);
     }
+
+    @Test
+    public void testAbnormalModifyTableDistributionType1(@Injectable OlapTable table) throws UserException {
+        fakeCatalog = new FakeCatalog();
+        fakeEditLog = new FakeEditLog();
+        FakeCatalog.setCatalog(masterCatalog);
+        Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
+        new Expectations() {
+            {
+                table.isColocateTable();
+                result = true;
+            }
+        };
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of colocate table.");
+        Catalog.getCurrentCatalog().convertDistributionType(db, table);
+    }
+
+    @Test
+    public void testAbnormalModifyTableDistributionType2(@Injectable OlapTable table) throws UserException {
+        fakeCatalog = new FakeCatalog();
+        fakeEditLog = new FakeEditLog();
+        FakeCatalog.setCatalog(masterCatalog);
+        Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
+        new Expectations() {
+            {
+                table.isColocateTable();
+                result = false;
+                table.getKeysType();
+                result = KeysType.UNIQUE_KEYS;
+            }
+        };
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of unique keys table.");
+        Catalog.getCurrentCatalog().convertDistributionType(db, table);
+    }
+
+    @Test
+    public void testAbnormalModifyTableDistributionType3(@Injectable OlapTable table) throws UserException {
+        fakeCatalog = new FakeCatalog();
+        fakeEditLog = new FakeEditLog();
+        FakeCatalog.setCatalog(masterCatalog);
+        Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
+        new Expectations() {
+            {
+                table.isColocateTable();
+                result = false;
+                table.getKeysType();
+                result = KeysType.AGG_KEYS;
+                table.getBaseSchema();
+                result = Lists.newArrayList(
+                    new Column("k1", Type.INT, true, null, "0", ""),
+                    new Column("v1", Type.INT, false, AggregateType.REPLACE, "0", ""));
+            }
+        };
+        expectedEx.expect(DdlException.class);
+        expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change "
+                + "distribution type of aggregate keys table which has value columns with REPLACE type.");
+        Catalog.getCurrentCatalog().convertDistributionType(db, table);
+    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 7d81d30df4..b5c28e11bb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TDisk;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 
@@ -38,7 +37,6 @@ import com.google.common.collect.Maps;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -325,16 +323,4 @@ public class CatalogTestUtil {
         backend.setAlive(true);
         return backend;
     }
-
-    public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort,
-            long totalCapacityB, long avaiLabelCapacityB) {
-        Backend backend = createBackend(id, host, heartPort, bePort, httpPort);
-        Map<String, TDisk> backendDisks = new HashMap<String, TDisk>();
-        String rootPath = "root_path";
-        TDisk disk = new TDisk(rootPath, totalCapacityB, avaiLabelCapacityB, true);
-        backendDisks.put(rootPath, disk);
-        backend.updateDisks(backendDisks);
-        backend.setAlive(true);
-        return backend;
-    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index f481c679c3..d1f7a3320d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -485,6 +485,36 @@ public class CreateTableTest {
                                 + "    \"dynamic_partition.start_day_of_month\" = \"3\"\n"
                                 + ");"));
 
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+                "Create unique keys table should not contain random distribution desc",
+                () -> createTable("CREATE TABLE test.tbl21\n"
+                            + "(\n"
+                            + "  `k1` bigint(20) NULL COMMENT \"\",\n"
+                            + "  `k2` largeint(40) NULL COMMENT \"\",\n"
+                            + "  `v1` varchar(204) NULL COMMENT \"\",\n"
+                            + "  `v2` smallint(6) NULL DEFAULT \"10\" COMMENT \"\"\n"
+                            + ") ENGINE=OLAP\n"
+                            + "UNIQUE KEY(`k1`, `k2`)\n"
+                            + "DISTRIBUTED BY RANDOM BUCKETS 32\n"
+                            + "PROPERTIES (\n"
+                            + "\"replication_allocation\" = \"tag.location.default: 1\"\n"
+                            + ");"));
+
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+                "Create aggregate keys table with value columns of which aggregate type"
+                    + " is REPLACE should not contain random distribution desc",
+                () -> createTable("CREATE TABLE test.tbl22\n"
+                    + "(\n"
+                    + "  `k1` bigint(20) NULL COMMENT \"\",\n"
+                    + "  `k2` largeint(40) NULL COMMENT \"\",\n"
+                    + "  `v1` bigint(20) REPLACE NULL COMMENT \"\",\n"
+                    + "  `v2` smallint(6) REPLACE_IF_NOT_NULL NULL DEFAULT \"10\" COMMENT \"\"\n"
+                    + ") ENGINE=OLAP\n"
+                    + "AGGREGATE KEY(`k1`, `k2`)\n"
+                    + "DISTRIBUTED BY RANDOM BUCKETS 32\n"
+                    + "PROPERTIES (\n"
+                    + "\"replication_allocation\" = \"tag.location.default: 1\"\n"
+                    + ");"));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org