You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/26 01:52:21 UTC
[doris] branch master updated: [fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4408231765 [fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414)
4408231765 is described below
commit 44082317659ccd81355029d9cc9e7caef29ca641
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Sun Jun 26 09:52:16 2022 +0800
[fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414)
---
.../apache/doris/alter/SchemaChangeHandler.java | 13 ++++
.../org/apache/doris/analysis/CreateTableStmt.java | 27 ++++----
.../java/org/apache/doris/catalog/Catalog.java | 15 +++++
.../org/apache/doris/catalog/DistributionInfo.java | 33 ---------
.../apache/doris/alter/SchemaChangeJobV2Test.java | 78 ++++++++++++++++++++--
.../org/apache/doris/catalog/CatalogTestUtil.java | 14 ----
.../org/apache/doris/catalog/CreateTableTest.java | 30 +++++++++
7 files changed, 145 insertions(+), 65 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 94515cfcfb..574c908803 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -48,6 +48,7 @@ import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
@@ -775,6 +776,13 @@ public class SchemaChangeHandler extends AlterHandler {
&& newColumn.getDefaultValue() != null && !newColumn.getDefaultValue().equals("0")) {
throw new DdlException("The default value of '"
+ newColName + "' with SUM aggregation function must be zero");
+ } else if (olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo) {
+ if (newColumn.getAggregationType() == AggregateType.REPLACE
+ || newColumn.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
+ throw new DdlException("Can not add value column with aggregation type "
+ + newColumn.getAggregationType() + " for olap table with random distribution : "
+ + newColName);
+ }
}
} else if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
if (newColumn.getAggregationType() != null) {
@@ -1499,6 +1507,11 @@ public class SchemaChangeHandler extends AlterHandler {
Catalog.getCurrentCatalog().modifyTableColocate(db, olapTable, colocateGroup, false, null);
return;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
+ String distributionType = properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE);
+ if (!distributionType.equalsIgnoreCase("random")) {
+ throw new DdlException("Only support modifying distribution type of table from"
+ + " hash to random");
+ }
Catalog.getCurrentCatalog().convertDistributionType(db, olapTable);
return;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 56f20d2113..b85aa21b33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
@@ -90,9 +91,6 @@ public class CreateTableStmt extends DdlStmt {
engineNames.add("hudi");
}
- // for backup. set to -1 for normal use
- private int tableSignature;
-
public CreateTableStmt() {
// for persist
tableName = new TableName();
@@ -163,7 +161,6 @@ public class CreateTableStmt extends DdlStmt {
this.ifNotExists = ifNotExists;
this.comment = Strings.nullToEmpty(comment);
- this.tableSignature = -1;
this.rollupAlterClauseList = rollupAlterClauseList == null ? new ArrayList<>() : rollupAlterClauseList;
}
@@ -243,14 +240,6 @@ public class CreateTableStmt extends DdlStmt {
return tableName.getDb();
}
- public void setTableSignature(int tableSignature) {
- this.tableSignature = tableSignature;
- }
-
- public int getTableSignature() {
- return tableSignature;
- }
-
public void setTableName(String newTableName) {
tableName = new TableName(tableName.getDb(), newTableName);
}
@@ -421,6 +410,20 @@ public class CreateTableStmt extends DdlStmt {
throw new AnalysisException("Create olap table should contain distribution desc");
}
distributionDesc.analyze(columnSet, columnDefs);
+ if (distributionDesc.type == DistributionInfo.DistributionInfoType.RANDOM) {
+ if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
+ throw new AnalysisException("Create unique keys table should not contain random distribution desc");
+ } else if (keysDesc.getKeysType() == KeysType.AGG_KEYS) {
+ for (ColumnDef columnDef : columnDefs) {
+ if (columnDef.getAggregateType() == AggregateType.REPLACE
+ || columnDef.getAggregateType() == AggregateType.REPLACE_IF_NOT_NULL) {
+ throw new AnalysisException("Create aggregate keys table with value columns of which"
+ + " aggregate type is " + columnDef.getAggregateType() + " should not contain random"
+ + " distribution desc");
+ }
+ }
+ }
+ }
} else if (engineName.equalsIgnoreCase("elasticsearch")) {
EsUtil.analyzePartitionAndDistributionDesc(partitionDesc, distributionDesc);
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 4156054c9d..8615350160 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4597,6 +4597,21 @@ public class Catalog {
public void convertDistributionType(Database db, OlapTable tbl) throws DdlException {
tbl.writeLockOrDdlException();
try {
+ if (tbl.isColocateTable()) {
+ throw new DdlException("Cannot change distribution type of colocate table.");
+ }
+ if (tbl.getKeysType() == KeysType.UNIQUE_KEYS) {
+ throw new DdlException("Cannot change distribution type of unique keys table.");
+ }
+ if (tbl.getKeysType() == KeysType.AGG_KEYS) {
+ for (Column column : tbl.getBaseSchema()) {
+ if (column.getAggregationType() == AggregateType.REPLACE
+ || column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
+ throw new DdlException("Cannot change distribution type of aggregate keys table which has value"
+ + " columns with " + column.getAggregationType() + " type.");
+ }
+ }
+ }
if (!tbl.convertHashDistributionToRandomDistribution()) {
throw new DdlException("Table " + tbl.getName() + " is not hash distributed");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
index 3a22f44f0f..959076f9f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
@@ -18,19 +18,15 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.DistributionDesc;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
-import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang.NotImplementedException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
public abstract class DistributionInfo implements Writable {
@@ -88,33 +84,4 @@ public abstract class DistributionInfo implements Writable {
public boolean equals(DistributionInfo info) {
return false;
}
-
- public static List<Expr> toDistExpr(OlapTable tbl, DistributionInfo distInfo, Map<String, Expr> exprByCol) {
- List<Expr> distExprs = Lists.newArrayList();
- if (distInfo instanceof RandomDistributionInfo) {
- for (Column col : tbl.getBaseSchema()) {
- if (col.isKey()) {
- Expr distExpr = exprByCol.get(col.getName());
- // used to compute hash
- if (col.getDataType() == PrimitiveType.CHAR) {
- distExpr.setType(Type.CHAR);
- }
- distExprs.add(distExpr);
- } else {
- break;
- }
- }
- } else if (distInfo instanceof HashDistributionInfo) {
- HashDistributionInfo hashDistInfo = (HashDistributionInfo) distInfo;
- for (Column col : hashDistInfo.getDistributionColumns()) {
- Expr distExpr = exprByCol.get(col.getName());
- // used to compute hash
- if (col.getDataType() == PrimitiveType.CHAR) {
- distExpr.setType(Type.CHAR);
- }
- distExprs.add(distExpr);
- }
- }
- return distExprs;
- }
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index 02771d213e..aadc893d90 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -31,11 +31,13 @@ import org.apache.doris.backup.CatalogMocker;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DynamicPartitionProperty;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.catalog.FakeEditLog;
+import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
@@ -47,6 +49,7 @@ import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
@@ -62,7 +65,10 @@ import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -353,7 +359,7 @@ public class SchemaChangeJobV2Test {
Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets());
}
- public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue, String missPropertyKey)
+ public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue)
throws UserException {
fakeCatalog = new FakeCatalog();
FakeCatalog.setCatalog(masterCatalog);
@@ -375,11 +381,11 @@ public class SchemaChangeJobV2Test {
@Test
public void testModifyDynamicPartitionWithoutTableProperty() throws UserException {
- modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false", DynamicPartitionProperty.TIME_UNIT);
- modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day", DynamicPartitionProperty.ENABLE);
- modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3", DynamicPartitionProperty.ENABLE);
- modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE);
- modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE);
+ modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false");
+ modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day");
+ modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3");
+ modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p");
+ modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30");
}
@Test
@@ -433,4 +439,64 @@ public class SchemaChangeJobV2Test {
Partition partition1 = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
Assert.assertTrue(partition1.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM);
}
+
+ @Test
+ public void testAbnormalModifyTableDistributionType1(@Injectable OlapTable table) throws UserException {
+ fakeCatalog = new FakeCatalog();
+ fakeEditLog = new FakeEditLog();
+ FakeCatalog.setCatalog(masterCatalog);
+ Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
+ new Expectations() {
+ {
+ table.isColocateTable();
+ result = true;
+ }
+ };
+ expectedEx.expect(DdlException.class);
+ expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of colocate table.");
+ Catalog.getCurrentCatalog().convertDistributionType(db, table);
+ }
+
+ @Test
+ public void testAbnormalModifyTableDistributionType2(@Injectable OlapTable table) throws UserException {
+ fakeCatalog = new FakeCatalog();
+ fakeEditLog = new FakeEditLog();
+ FakeCatalog.setCatalog(masterCatalog);
+ Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
+ new Expectations() {
+ {
+ table.isColocateTable();
+ result = false;
+ table.getKeysType();
+ result = KeysType.UNIQUE_KEYS;
+ }
+ };
+ expectedEx.expect(DdlException.class);
+ expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of unique keys table.");
+ Catalog.getCurrentCatalog().convertDistributionType(db, table);
+ }
+
+ @Test
+ public void testAbnormalModifyTableDistributionType3(@Injectable OlapTable table) throws UserException {
+ fakeCatalog = new FakeCatalog();
+ fakeEditLog = new FakeEditLog();
+ FakeCatalog.setCatalog(masterCatalog);
+ Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
+ new Expectations() {
+ {
+ table.isColocateTable();
+ result = false;
+ table.getKeysType();
+ result = KeysType.AGG_KEYS;
+ table.getBaseSchema();
+ result = Lists.newArrayList(
+ new Column("k1", Type.INT, true, null, "0", ""),
+ new Column("v1", Type.INT, false, AggregateType.REPLACE, "0", ""));
+ }
+ };
+ expectedEx.expect(DdlException.class);
+ expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change "
+ + "distribution type of aggregate keys table which has value columns with REPLACE type.");
+ Catalog.getCurrentCatalog().convertDistributionType(db, table);
+ }
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 7d81d30df4..b5c28e11bb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.persist.EditLog;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@@ -38,7 +37,6 @@ import com.google.common.collect.Maps;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -325,16 +323,4 @@ public class CatalogTestUtil {
backend.setAlive(true);
return backend;
}
-
- public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort,
- long totalCapacityB, long avaiLabelCapacityB) {
- Backend backend = createBackend(id, host, heartPort, bePort, httpPort);
- Map<String, TDisk> backendDisks = new HashMap<String, TDisk>();
- String rootPath = "root_path";
- TDisk disk = new TDisk(rootPath, totalCapacityB, avaiLabelCapacityB, true);
- backendDisks.put(rootPath, disk);
- backend.updateDisks(backendDisks);
- backend.setAlive(true);
- return backend;
- }
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index f481c679c3..d1f7a3320d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -485,6 +485,36 @@ public class CreateTableTest {
+ " \"dynamic_partition.start_day_of_month\" = \"3\"\n"
+ ");"));
+ ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+ "Create unique keys table should not contain random distribution desc",
+ () -> createTable("CREATE TABLE test.tbl21\n"
+ + "(\n"
+ + " `k1` bigint(20) NULL COMMENT \"\",\n"
+ + " `k2` largeint(40) NULL COMMENT \"\",\n"
+ + " `v1` varchar(204) NULL COMMENT \"\",\n"
+ + " `v2` smallint(6) NULL DEFAULT \"10\" COMMENT \"\"\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(`k1`, `k2`)\n"
+ + "DISTRIBUTED BY RANDOM BUCKETS 32\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\"\n"
+ + ");"));
+
+ ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+ "Create aggregate keys table with value columns of which aggregate type"
+ + " is REPLACE should not contain random distribution desc",
+ () -> createTable("CREATE TABLE test.tbl22\n"
+ + "(\n"
+ + " `k1` bigint(20) NULL COMMENT \"\",\n"
+ + " `k2` largeint(40) NULL COMMENT \"\",\n"
+ + " `v1` bigint(20) REPLACE NULL COMMENT \"\",\n"
+ + " `v2` smallint(6) REPLACE_IF_NOT_NULL NULL DEFAULT \"10\" COMMENT \"\"\n"
+ + ") ENGINE=OLAP\n"
+ + "AGGREGATE KEY(`k1`, `k2`)\n"
+ + "DISTRIBUTED BY RANDOM BUCKETS 32\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\"\n"
+ + ");"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org