You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by xu...@apache.org on 2023/06/05 04:20:57 UTC

[doris] branch master updated: [fix](dynamic partition) partition create failed after alter distributed column (#20239)

This is an automated email from the ASF dual-hosted git repository.

xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c28a71378 [fix](dynamic partition) partition create failed after alter distributed column (#20239)
3c28a71378 is described below

commit 3c28a7137830e8bebec826ff7d08215665b8835d
Author: camby <zh...@baidu.com>
AuthorDate: Mon Jun 5 12:20:50 2023 +0800

    [fix](dynamic partition) partition create failed after alter distributed column (#20239)
    
    This pr fix following two problems:
    
    Problem1: Alter column comment make add dynamic partition failed inside issue #10811
    
    create table with dynamic partition policy;
    restart FE;
    alter distribution column comment;
    alter dynamic_partition.end to trigger add new partition by dynamic partition scheduler;
    Then we got the error log, and the new partition create failed.
    dynamic add partition failed: errCode = 2, detailMessage =      Cannot assign hash distribution with different distribution cols. default is: [id int(11) NULL COMMENT 'new_comment_of_id'], db: default_cluster:example_db, table: test_2
    Problem2: rename distributed column, make old partition insert failed. inside #20405
    
    The key point of the reproduce steps is restart FE.
    
    It seems all versions will be affected, include master and lts-1.1 and so on.
---
 .../doris/analysis/HashDistributionDesc.java       |  4 +-
 .../main/java/org/apache/doris/catalog/Column.java | 22 ++++++++
 .../main/java/org/apache/doris/catalog/Env.java    | 25 ++++++---
 .../apache/doris/catalog/HashDistributionInfo.java | 14 ++++-
 .../apache/doris/datasource/InternalCatalog.java   | 13 ++---
 regression-test/pipeline/p0/conf/fe.conf           |  2 +
 regression-test/pipeline/p1/conf/fe.conf           |  2 +
 .../test_dynamic_partition_with_alter.groovy       | 56 ++++++++++++++++++++
 .../test_dynamic_partition_with_rename.groovy      | 60 ++++++++++++++++++++++
 9 files changed, 181 insertions(+), 17 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
index d54049f793..374bf10bdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
@@ -131,7 +131,9 @@ public class HashDistributionDesc extends DistributionDesc {
                                 + column.getName() + "].");
                     }
 
-                    distributionColumns.add(column);
+                    // distribution info and base columns persist seperately inside OlapTable, so we need deep copy
+                    // to avoid modify table columns also modify columns inside distribution info.
+                    distributionColumns.add(new Column(column));
                     find = true;
                     break;
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 615851e257..936db90a35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -784,6 +784,28 @@ public class Column implements Writable, GsonPostProcessable {
                 && Objects.equals(realDefaultValue, other.realDefaultValue);
     }
 
+    // distribution column compare only care about attrs which affect data,
+    // do not care about attrs, such as comment
+    public boolean equalsForDistribution(Column other) {
+        if (other == this) {
+            return true;
+        }
+
+        return name.equalsIgnoreCase(other.name)
+                && Objects.equals(getDefaultValue(), other.getDefaultValue())
+                && Objects.equals(aggregationType, other.aggregationType)
+                && isAggregationTypeImplicit == other.isAggregationTypeImplicit
+                && isKey == other.isKey
+                && isAllowNull == other.isAllowNull
+                && getDataType().equals(other.getDataType())
+                && getStrLen() == other.getStrLen()
+                && getPrecision() == other.getPrecision()
+                && getScale() == other.getScale()
+                && visible == other.visible
+                && Objects.equals(children, other.children)
+                && Objects.equals(realDefaultValue, other.realDefaultValue);
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index bc3a3b4f76..c64b4a2c74 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4290,12 +4290,26 @@ public class Env {
         // 4. modify distribution info
         DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
         if (distributionInfo.getType() == DistributionInfoType.HASH) {
+            // modify default distribution info
             List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
             for (Column column : distributionColumns) {
                 if (column.getName().equalsIgnoreCase(colName)) {
                     column.setName(newColName);
                 }
             }
+            // modify distribution info inside partitions
+            for (Partition p : table.getPartitions()) {
+                DistributionInfo partDistInfo = p.getDistributionInfo();
+                if (partDistInfo.getType() != DistributionInfoType.HASH) {
+                    continue;
+                }
+                List<Column> partDistColumns = ((HashDistributionInfo) partDistInfo).getDistributionColumns();
+                for (Column column : partDistColumns) {
+                    if (column.getName().equalsIgnoreCase(colName)) {
+                        column.setName(newColName);
+                    }
+                }
+            }
         }
 
         // 5. modify sequence map col
@@ -4546,13 +4560,10 @@ public class Env {
                 }
                 if (distributionInfo.getType() == DistributionInfoType.HASH) {
                     HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
-                    List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
-                    List<Column> defaultDistriCols
-                            = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
-                    if (!newDistriCols.equals(defaultDistriCols)) {
-                        throw new DdlException(
-                                "Cannot assign hash distribution with different distribution cols. " + "default is: "
-                                        + defaultDistriCols);
+                    if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) {
+                        throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+                                + "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: "
+                                + ((HashDistributionInfo) distributionInfo).getDistributionColumns());
                     }
                 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
index d746bd355f..5f30bc2098 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
@@ -95,6 +95,18 @@ public class HashDistributionInfo extends DistributionInfo {
         return distributionInfo;
     }
 
+    public boolean sameDistributionColumns(HashDistributionInfo other) {
+        if (distributionColumns.size() != other.distributionColumns.size()) {
+            return false;
+        }
+        for (int i = 0; i < distributionColumns.size(); ++i) {
+            if (!distributionColumns.get(i).equalsForDistribution(other.distributionColumns.get(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -107,7 +119,7 @@ public class HashDistributionInfo extends DistributionInfo {
             return false;
         }
         HashDistributionInfo that = (HashDistributionInfo) o;
-        return bucketNum == that.bucketNum && Objects.equals(distributionColumns, that.distributionColumns);
+        return bucketNum == that.bucketNum && sameDistributionColumns(that);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index d1c0354337..1fc3b20bcc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1410,17 +1410,14 @@ public class InternalCatalog implements CatalogIf<Database> {
 
                 if (distributionInfo.getType() == DistributionInfoType.HASH) {
                     HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
-                    List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
-                    List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
-                            .getDistributionColumns();
-                    if (!newDistriCols.equals(defaultDistriCols)) {
-                        throw new DdlException(
-                                "Cannot assign hash distribution with different distribution cols. " + "default is: "
-                                        + defaultDistriCols);
-                    }
                     if (hashDistributionInfo.getBucketNum() <= 0) {
                         throw new DdlException("Cannot assign hash distribution buckets less than 1");
                     }
+                    if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) {
+                        throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+                                + "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: "
+                                + ((HashDistributionInfo) distributionInfo).getDistributionColumns());
+                    }
                 } else if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
                     RandomDistributionInfo randomDistributionInfo = (RandomDistributionInfo) distributionInfo;
                     if (randomDistributionInfo.getBucketNum() <= 0) {
diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf
index 5879c463ee..d2e8cbd282 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -80,3 +80,5 @@ enable_struct_type=true
 
 # enable mtmv
 enable_mtmv = true
+
+dynamic_partition_check_interval_seconds=5
diff --git a/regression-test/pipeline/p1/conf/fe.conf b/regression-test/pipeline/p1/conf/fe.conf
index b7fbf07bcb..ba7de606b5 100644
--- a/regression-test/pipeline/p1/conf/fe.conf
+++ b/regression-test/pipeline/p1/conf/fe.conf
@@ -82,3 +82,5 @@ enable_mtmv = true
 # enable auto collect statistics
 enable_auto_collect_statistics=true
 auto_check_statistics_in_sec=60
+
+dynamic_partition_check_interval_seconds=5
diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy
new file mode 100644
index 0000000000..32cfc742a7
--- /dev/null
+++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_dynamic_partition_with_alter") {
+    def tbl = "test_dynamic_partition_with_alter"
+    sql "drop table if exists ${tbl}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbl}
+        ( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
+        AGGREGATE KEY(k1,k2)
+        PARTITION BY RANGE(k1) ( )
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "dynamic_partition.enable"="true",
+            "dynamic_partition.end"="3",
+            "dynamic_partition.buckets"="1",
+            "dynamic_partition.start"="-3",
+            "dynamic_partition.prefix"="p",
+            "dynamic_partition.time_unit"="DAY",
+            "dynamic_partition.create_history_partition"="true",
+            "dynamic_partition.replication_allocation" = "tag.location.default: 1")
+        """
+    result = sql "show partitions from ${tbl}"
+    assertEquals(7, result.size())
+
+    // modify distributed column comment, then try to add too more dynamic partition
+    sql """ alter table ${tbl} modify column k1 comment 'new_comment_for_k1' """
+    sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
+    sql """ alter table ${tbl} set('dynamic_partition.end'='5') """
+    result = sql "show partitions from ${tbl}"
+    for (def retry = 0; retry < 15; retry++) {
+        if (result.size() == 9) {
+            break;
+        }
+        logger.info("wait dynamic partition scheduler, sleep 1s")
+        sleep(1000);
+        result = sql "show partitions from ${tbl}"
+    }
+    assertEquals(9, result.size())
+
+    sql "drop table ${tbl}"
+}
diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy
new file mode 100644
index 0000000000..b07a2f1a63
--- /dev/null
+++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_dynamic_partition_with_rename") {
+    def tbl = "test_dynamic_partition_with_rename"
+    sql "drop table if exists ${tbl}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbl}
+        ( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
+        AGGREGATE KEY(k1,k2)
+        PARTITION BY RANGE(k1) ( )
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "dynamic_partition.enable"="true",
+            "dynamic_partition.end"="3",
+            "dynamic_partition.buckets"="1",
+            "dynamic_partition.start"="-3",
+            "dynamic_partition.prefix"="p",
+            "dynamic_partition.time_unit"="DAY",
+            "dynamic_partition.create_history_partition"="true",
+            "dynamic_partition.replication_allocation" = "tag.location.default: 1")
+        """
+    result = sql "show partitions from ${tbl}"
+    assertEquals(7, result.size())
+
+    // rename distributed column, then try to add too more dynamic partition
+    sql "alter table ${tbl} rename column k1 renamed_k1"
+    sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
+    sql """ alter table ${tbl} set('dynamic_partition.end'='5') """
+    result = sql "show partitions from ${tbl}"
+    for (def retry = 0; retry < 15; retry++) {
+        if (result.size() == 9) {
+            break;
+        }
+        logger.info("wait dynamic partition scheduler, sleep 1s")
+        sleep(1000);
+        result = sql "show partitions from ${tbl}"
+    }
+    assertEquals(9, result.size())
+    for (def line = 0; line < result.size(); line++) {
+        // XXX: DistributionKey at pos(7), next maybe impl by sql meta
+        assertEquals("renamed_k1", result.get(line).get(7))
+    }
+
+    sql "drop table ${tbl}"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org