You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by xu...@apache.org on 2023/06/05 04:20:57 UTC
[doris] branch master updated: [fix](dynamic partition) partition create failed after alter distributed column (#20239)
This is an automated email from the ASF dual-hosted git repository.
xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3c28a71378 [fix](dynamic partition) partition create failed after alter distributed column (#20239)
3c28a71378 is described below
commit 3c28a7137830e8bebec826ff7d08215665b8835d
Author: camby <zh...@baidu.com>
AuthorDate: Mon Jun 5 12:20:50 2023 +0800
[fix](dynamic partition) partition create failed after alter distributed column (#20239)
This pr fix following two problems:
Problem1: Alter column comment make add dynamic partition failed inside issue #10811
create table with dynamic partition policy;
restart FE;
alter distribution column comment;
alter dynamic_partition.end to trigger add new partition by dynamic partition scheduler;
Then we got the error log, and the new partition create failed.
dynamic add partition failed: errCode = 2, detailMessage = Cannot assign hash distribution with different distribution cols. default is: [id int(11) NULL COMMENT 'new_comment_of_id'], db: default_cluster:example_db, table: test_2
Problem2: rename distributed column, make old partition insert failed. inside #20405
The key point of the reproduce steps is restart FE.
It seems all versions will be affected, include master and lts-1.1 and so on.
---
.../doris/analysis/HashDistributionDesc.java | 4 +-
.../main/java/org/apache/doris/catalog/Column.java | 22 ++++++++
.../main/java/org/apache/doris/catalog/Env.java | 25 ++++++---
.../apache/doris/catalog/HashDistributionInfo.java | 14 ++++-
.../apache/doris/datasource/InternalCatalog.java | 13 ++---
regression-test/pipeline/p0/conf/fe.conf | 2 +
regression-test/pipeline/p1/conf/fe.conf | 2 +
.../test_dynamic_partition_with_alter.groovy | 56 ++++++++++++++++++++
.../test_dynamic_partition_with_rename.groovy | 60 ++++++++++++++++++++++
9 files changed, 181 insertions(+), 17 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
index d54049f793..374bf10bdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
@@ -131,7 +131,9 @@ public class HashDistributionDesc extends DistributionDesc {
+ column.getName() + "].");
}
- distributionColumns.add(column);
+ // distribution info and base columns persist seperately inside OlapTable, so we need deep copy
+ // to avoid modify table columns also modify columns inside distribution info.
+ distributionColumns.add(new Column(column));
find = true;
break;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 615851e257..936db90a35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -784,6 +784,28 @@ public class Column implements Writable, GsonPostProcessable {
&& Objects.equals(realDefaultValue, other.realDefaultValue);
}
+ // distribution column compare only care about attrs which affect data,
+ // do not care about attrs, such as comment
+ public boolean equalsForDistribution(Column other) {
+ if (other == this) {
+ return true;
+ }
+
+ return name.equalsIgnoreCase(other.name)
+ && Objects.equals(getDefaultValue(), other.getDefaultValue())
+ && Objects.equals(aggregationType, other.aggregationType)
+ && isAggregationTypeImplicit == other.isAggregationTypeImplicit
+ && isKey == other.isKey
+ && isAllowNull == other.isAllowNull
+ && getDataType().equals(other.getDataType())
+ && getStrLen() == other.getStrLen()
+ && getPrecision() == other.getPrecision()
+ && getScale() == other.getScale()
+ && visible == other.visible
+ && Objects.equals(children, other.children)
+ && Objects.equals(realDefaultValue, other.realDefaultValue);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index bc3a3b4f76..c64b4a2c74 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4290,12 +4290,26 @@ public class Env {
// 4. modify distribution info
DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
if (distributionInfo.getType() == DistributionInfoType.HASH) {
+ // modify default distribution info
List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
for (Column column : distributionColumns) {
if (column.getName().equalsIgnoreCase(colName)) {
column.setName(newColName);
}
}
+ // modify distribution info inside partitions
+ for (Partition p : table.getPartitions()) {
+ DistributionInfo partDistInfo = p.getDistributionInfo();
+ if (partDistInfo.getType() != DistributionInfoType.HASH) {
+ continue;
+ }
+ List<Column> partDistColumns = ((HashDistributionInfo) partDistInfo).getDistributionColumns();
+ for (Column column : partDistColumns) {
+ if (column.getName().equalsIgnoreCase(colName)) {
+ column.setName(newColName);
+ }
+ }
+ }
}
// 5. modify sequence map col
@@ -4546,13 +4560,10 @@ public class Env {
}
if (distributionInfo.getType() == DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
- List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
- List<Column> defaultDistriCols
- = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
- if (!newDistriCols.equals(defaultDistriCols)) {
- throw new DdlException(
- "Cannot assign hash distribution with different distribution cols. " + "default is: "
- + defaultDistriCols);
+ if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) {
+ throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+ + "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: "
+ + ((HashDistributionInfo) distributionInfo).getDistributionColumns());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
index d746bd355f..5f30bc2098 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
@@ -95,6 +95,18 @@ public class HashDistributionInfo extends DistributionInfo {
return distributionInfo;
}
+ public boolean sameDistributionColumns(HashDistributionInfo other) {
+ if (distributionColumns.size() != other.distributionColumns.size()) {
+ return false;
+ }
+ for (int i = 0; i < distributionColumns.size(); ++i) {
+ if (!distributionColumns.get(i).equalsForDistribution(other.distributionColumns.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -107,7 +119,7 @@ public class HashDistributionInfo extends DistributionInfo {
return false;
}
HashDistributionInfo that = (HashDistributionInfo) o;
- return bucketNum == that.bucketNum && Objects.equals(distributionColumns, that.distributionColumns);
+ return bucketNum == that.bucketNum && sameDistributionColumns(that);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index d1c0354337..1fc3b20bcc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1410,17 +1410,14 @@ public class InternalCatalog implements CatalogIf<Database> {
if (distributionInfo.getType() == DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
- List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
- List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
- .getDistributionColumns();
- if (!newDistriCols.equals(defaultDistriCols)) {
- throw new DdlException(
- "Cannot assign hash distribution with different distribution cols. " + "default is: "
- + defaultDistriCols);
- }
if (hashDistributionInfo.getBucketNum() <= 0) {
throw new DdlException("Cannot assign hash distribution buckets less than 1");
}
+ if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) {
+ throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+ + "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: "
+ + ((HashDistributionInfo) distributionInfo).getDistributionColumns());
+ }
} else if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
RandomDistributionInfo randomDistributionInfo = (RandomDistributionInfo) distributionInfo;
if (randomDistributionInfo.getBucketNum() <= 0) {
diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf
index 5879c463ee..d2e8cbd282 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -80,3 +80,5 @@ enable_struct_type=true
# enable mtmv
enable_mtmv = true
+
+dynamic_partition_check_interval_seconds=5
diff --git a/regression-test/pipeline/p1/conf/fe.conf b/regression-test/pipeline/p1/conf/fe.conf
index b7fbf07bcb..ba7de606b5 100644
--- a/regression-test/pipeline/p1/conf/fe.conf
+++ b/regression-test/pipeline/p1/conf/fe.conf
@@ -82,3 +82,5 @@ enable_mtmv = true
# enable auto collect statistics
enable_auto_collect_statistics=true
auto_check_statistics_in_sec=60
+
+dynamic_partition_check_interval_seconds=5
diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy
new file mode 100644
index 0000000000..32cfc742a7
--- /dev/null
+++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_dynamic_partition_with_alter") {
+ def tbl = "test_dynamic_partition_with_alter"
+ sql "drop table if exists ${tbl}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbl}
+ ( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
+ AGGREGATE KEY(k1,k2)
+ PARTITION BY RANGE(k1) ( )
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "dynamic_partition.enable"="true",
+ "dynamic_partition.end"="3",
+ "dynamic_partition.buckets"="1",
+ "dynamic_partition.start"="-3",
+ "dynamic_partition.prefix"="p",
+ "dynamic_partition.time_unit"="DAY",
+ "dynamic_partition.create_history_partition"="true",
+ "dynamic_partition.replication_allocation" = "tag.location.default: 1")
+ """
+ result = sql "show partitions from ${tbl}"
+ assertEquals(7, result.size())
+
+ // modify distributed column comment, then try to add too more dynamic partition
+ sql """ alter table ${tbl} modify column k1 comment 'new_comment_for_k1' """
+ sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
+ sql """ alter table ${tbl} set('dynamic_partition.end'='5') """
+ result = sql "show partitions from ${tbl}"
+ for (def retry = 0; retry < 15; retry++) {
+ if (result.size() == 9) {
+ break;
+ }
+ logger.info("wait dynamic partition scheduler, sleep 1s")
+ sleep(1000);
+ result = sql "show partitions from ${tbl}"
+ }
+ assertEquals(9, result.size())
+
+ sql "drop table ${tbl}"
+}
diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy
new file mode 100644
index 0000000000..b07a2f1a63
--- /dev/null
+++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_dynamic_partition_with_rename") {
+ def tbl = "test_dynamic_partition_with_rename"
+ sql "drop table if exists ${tbl}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbl}
+ ( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
+ AGGREGATE KEY(k1,k2)
+ PARTITION BY RANGE(k1) ( )
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "dynamic_partition.enable"="true",
+ "dynamic_partition.end"="3",
+ "dynamic_partition.buckets"="1",
+ "dynamic_partition.start"="-3",
+ "dynamic_partition.prefix"="p",
+ "dynamic_partition.time_unit"="DAY",
+ "dynamic_partition.create_history_partition"="true",
+ "dynamic_partition.replication_allocation" = "tag.location.default: 1")
+ """
+ result = sql "show partitions from ${tbl}"
+ assertEquals(7, result.size())
+
+ // rename distributed column, then try to add too more dynamic partition
+ sql "alter table ${tbl} rename column k1 renamed_k1"
+ sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
+ sql """ alter table ${tbl} set('dynamic_partition.end'='5') """
+ result = sql "show partitions from ${tbl}"
+ for (def retry = 0; retry < 15; retry++) {
+ if (result.size() == 9) {
+ break;
+ }
+ logger.info("wait dynamic partition scheduler, sleep 1s")
+ sleep(1000);
+ result = sql "show partitions from ${tbl}"
+ }
+ assertEquals(9, result.size())
+ for (def line = 0; line < result.size(); line++) {
+ // XXX: DistributionKey at pos(7), next maybe impl by sql meta
+ assertEquals("renamed_k1", result.get(line).get(7))
+ }
+
+ sql "drop table ${tbl}"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org