You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/04/03 06:03:49 UTC

[doris] branch master updated: [feature](colocate) support cross database colocate join (#18152)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ecd3fd07f6 [feature](colocate) support cross database colocate join (#18152)
ecd3fd07f6 is described below

commit ecd3fd07f6497cb39334f1ed103efc29dae2a326
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Mon Apr 3 14:03:42 2023 +0800

    [feature](colocate) support cross database colocate join (#18152)
---
 .../advanced/join-optimization/colocation-join.md  |  19 ++
 .../advanced/join-optimization/colocation-join.md  |  21 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../apache/doris/catalog/ColocateGroupSchema.java  |   7 +-
 .../apache/doris/catalog/ColocateTableIndex.java   |  74 ++++++-
 .../main/java/org/apache/doris/catalog/Env.java    |  27 +--
 .../clone/ColocateTableCheckerAndBalancer.java     |  25 ++-
 .../apache/doris/common/util/PropertyAnalyzer.java |   1 +
 .../apache/doris/datasource/InternalCatalog.java   |   8 +-
 .../org/apache/doris/journal/JournalEntity.java    |   3 +-
 .../apache/doris/persist/ColocatePersistInfo.java  |   8 -
 .../apache/doris/persist/TablePropertyInfo.java    |  46 +++--
 .../apache/doris/planner/DistributedPlanner.java   |   2 +-
 .../apache/doris/catalog/ColocateTableTest.java    |   2 +-
 .../org/apache/doris/planner/ColocatePlanTest.java | 217 +++++++++++++++------
 .../apache/doris/utframe/TestWithFeService.java    |   6 +
 .../data/correctness_p0/test_colocate_join.out     |  22 +++
 .../correctness_p0/test_colocate_join.groovy       | 111 +++++++++++
 18 files changed, 476 insertions(+), 127 deletions(-)

diff --git a/docs/en/docs/advanced/join-optimization/colocation-join.md b/docs/en/docs/advanced/join-optimization/colocation-join.md
index fd67ee5a8a..15357c2913 100644
--- a/docs/en/docs/advanced/join-optimization/colocation-join.md
+++ b/docs/en/docs/advanced/join-optimization/colocation-join.md
@@ -93,6 +93,25 @@ PROPERTIES(
 If the specified group does not exist, Doris automatically creates a group that contains only the current table. If the Group already exists, Doris checks whether the current table satisfies the Colocation Group Schema. If satisfied, the table is created and added to the Group. At the same time, tables create fragments and replicas based on existing data distribution rules in Groups.
 Group belongs to a database, and its name is unique in a database. Internal storage is the full name of Group `dbId_groupName`, but users only perceive groupName.
 
+<version since="dev">
+
+In version 2.0, Doris supports cross-Database Group. When creating a table, you need to use the keyword `__global__` as a prefix of the Group name. like:
+
+```
+CREATE TABLE tbl (k1 int, v1 int sum)
+DISTRIBUTED BY HASH(k1)
+BUCKETS 8
+PROPERTIES(
+     "colocate_with" = "__global__group1"
+);
+```
+
+The Group prefixed with `__global__` no longer belongs to a Database, and its name is also globally unique.
+
+Cross-Database Colocate Join can be realized by creating a Global Group.
+
+</version>
+
 ### Delete table
 
 When the last table in Group is deleted completely (deleting completely means deleting from the recycle bin). Usually, when a table is deleted by the `DROP TABLE` command, it will be deleted after the default one-day stay in the recycle bin, and the group will be deleted automatically.
diff --git a/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md b/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md
index c37eb7bbec..e357683dc8 100644
--- a/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md
+++ b/docs/zh-CN/docs/advanced/join-optimization/colocation-join.md
@@ -90,6 +90,25 @@ PROPERTIES(
 
 如果指定的 Group 不存在,则 Doris 会自动创建一个只包含当前这张表的 Group。如果 Group 已存在,则 Doris 会检查当前表是否满足 Colocation Group Schema。如果满足,则会创建该表,并将该表加入 Group。同时,表会根据已存在的 Group 中的数据分布规则创建分片和副本。 Group 归属于一个 Database,Group 的名字在一个 Database 内唯一。在内部存储是 Group 的全名为 `dbId_groupName`,但用户只感知 groupName。
 
+<version since="dev">
+
+2.0 版本中,Doris 支持了跨Database的 Group。在建表时,需使用关键词 `__global__` 作为 Group 名称的前缀。如:
+
+```
+CREATE TABLE tbl (k1 int, v1 int sum)
+DISTRIBUTED BY HASH(k1)
+BUCKETS 8
+PROPERTIES(
+    "colocate_with" = "__global__group1"
+);
+```
+
+`__global__` 前缀的 Group 不再归属于一个 Database,其名称也是全局唯一的。
+
+通过创建 Global Group,可以实现跨 Database 的 Colocate Join。
+
+</version>
+
 ### 删表
 
 当 Group 中最后一张表彻底删除后(彻底删除是指从回收站中删除。通常,一张表通过 `DROP TABLE` 命令删除后,会在回收站默认停留一天的时间后,再删除),该 Group 也会被自动删除。
@@ -408,4 +427,4 @@ Doris 提供了几个和 Colocation Join 有关的 HTTP Restful API,用于查
 
    其中 Body 是以嵌套数组表示的 BucketsSequence 以及每个 Bucket 中分片分布所在 BE 的 id。
 
-   注意,使用该命令,可能需要将 FE 的配置 `disable_colocate_relocate` 和 `disable_colocate_balance` 设为 true。即关闭系统自动的 Colocation 副本修复和均衡。否则可能在修改后,会被系统自动重置。
\ No newline at end of file
+   注意,使用该命令,可能需要将 FE 的配置 `disable_colocate_relocate` 和 `disable_colocate_balance` 设为 true。即关闭系统自动的 Colocation 副本修复和均衡。否则可能在修改后,会被系统自动重置。
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 6f6b6740b4..e220b64fb0 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -56,9 +56,11 @@ public final class FeMetaVersion {
     public static final int VERSION_117 = 117;
     // change frontend meta to json, add hostname to MasterInfo
     public static final int VERSION_118 = 118;
+    // TablePropertyInfo add db id
+    public static final int VERSION_119 = 119;
 
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_118;
+    public static final int VERSION_CURRENT = VERSION_119;
 
     // all logs meta version should >= the minimum version, so that we could remove many if clause, for example
     // if (FE_METAVERSION < VERSION_94) ...
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
index ff1c957cd4..09e9437a35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
@@ -90,6 +90,11 @@ public class ColocateGroupSchema implements Writable {
             // distribution col type
             for (int i = 0; i < distributionColTypes.size(); i++) {
                 Type targetColType = distributionColTypes.get(i);
+                // varchar and string has same distribution hash value if it's data is same
+                if (targetColType.isVarcharOrStringType() && info.getDistributionColumns().get(i).getType()
+                        .isVarcharOrStringType()) {
+                    continue;
+                }
                 if (!targetColType.equals(info.getDistributionColumns().get(i).getType())) {
                     ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_TYPE,
                             info.getDistributionColumns().get(i).getName(), targetColType);
@@ -98,7 +103,7 @@ public class ColocateGroupSchema implements Writable {
         }
     }
 
-    public void checkReplicaAllocation(PartitionInfo partitionInfo) throws DdlException {
+    private void checkReplicaAllocation(PartitionInfo partitionInfo) throws DdlException {
         for (ReplicaAllocation replicaAlloc : partitionInfo.idToReplicaAllocation.values()) {
             if (!replicaAlloc.equals(this.replicaAlloc)) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_REPLICATION_ALLOCATION,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 80144d3d61..e0e327c2c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.ColocatePersistInfo;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.resource.Tag;
 
@@ -52,16 +53,23 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
- * maintain the colocate table related indexes and meta
+ * maintain the colocation table related indexes and meta
  */
 public class ColocateTableIndex implements Writable {
     private static final Logger LOG = LogManager.getLogger(ColocateTableIndex.class);
 
-    public static class GroupId implements Writable {
+    public static class GroupId implements Writable, GsonPostProcessable {
+        public static final String GLOBAL_COLOCATE_PREFIX = "__global__";
+
         @SerializedName(value = "dbId")
         public Long dbId;
         @SerializedName(value = "grpId")
         public Long grpId;
+        // only available when dbId = 0
+        // because for global colocate table, the dbId is 0, so we do not know which db the table belongs to,
+        // so we use tblId2DbId to record the dbId of each table
+        @SerializedName(value = "tblId2DbId")
+        private Map<Long, Long> tblId2DbId = Maps.newHashMap();
 
         private GroupId() {
         }
@@ -71,6 +79,23 @@ public class ColocateTableIndex implements Writable {
             this.grpId = grpId;
         }
 
+        public void addTblId2DbId(long tblId, long dbId) {
+            Preconditions.checkState(this.dbId == 0);
+            tblId2DbId.put(tblId, dbId);
+        }
+
+        public void removeTblId2DbId(long tblId) {
+            tblId2DbId.remove(tblId);
+        }
+
+        public long getDbIdByTblId(long tblId) {
+            return tblId2DbId.get(tblId);
+        }
+
+        public int getTblId2DbIdSize() {
+            return tblId2DbId.size();
+        }
+
         public static GroupId read(DataInput in) throws IOException {
             if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
                 GroupId groupId = new GroupId();
@@ -102,6 +127,13 @@ public class ColocateTableIndex implements Writable {
             return dbId.equals(other.dbId) && grpId.equals(other.grpId);
         }
 
+        @Override
+        public void gsonPostProcess() throws IOException {
+            if (tblId2DbId == null) {
+                tblId2DbId = Maps.newHashMap();
+            }
+        }
+
         @Override
         public int hashCode() {
             int result = 17;
@@ -114,6 +146,18 @@ public class ColocateTableIndex implements Writable {
         public String toString() {
             return dbId + "." + grpId;
         }
+
+        public static String getFullGroupName(long dbId, String colocateGroup) {
+            if (colocateGroup.startsWith(GLOBAL_COLOCATE_PREFIX)) {
+                return colocateGroup;
+            } else {
+                return dbId + "_" + colocateGroup;
+            }
+        }
+
+        public static boolean isGlobalGroupName(String groupName) {
+            return groupName.startsWith(GLOBAL_COLOCATE_PREFIX);
+        }
     }
 
     // group_name -> group_id
@@ -155,11 +199,10 @@ public class ColocateTableIndex implements Writable {
 
     // NOTICE: call 'addTableToGroup()' will not modify 'group2BackendsPerBucketSeq'
     // 'group2BackendsPerBucketSeq' need to be set manually before or after, if necessary.
-    public GroupId addTableToGroup(long dbId, OlapTable tbl, String groupName, GroupId assignedGroupId) {
+    public GroupId addTableToGroup(long dbId, OlapTable tbl, String fullGroupName, GroupId assignedGroupId) {
         writeLock();
         try {
             GroupId groupId = null;
-            String fullGroupName = dbId + "_" + groupName;
             if (groupName2Id.containsKey(fullGroupName)) {
                 groupId = groupName2Id.get(fullGroupName);
             } else {
@@ -168,7 +211,11 @@ public class ColocateTableIndex implements Writable {
                     groupId = assignedGroupId;
                 } else {
                     // generate a new one
-                    groupId = new GroupId(dbId, Env.getCurrentEnv().getNextId());
+                    if (GroupId.isGlobalGroupName(fullGroupName)) {
+                        groupId = new GroupId(0, Env.getCurrentEnv().getNextId());
+                    } else {
+                        groupId = new GroupId(dbId, Env.getCurrentEnv().getNextId());
+                    }
                 }
                 HashDistributionInfo distributionInfo = (HashDistributionInfo) tbl.getDefaultDistributionInfo();
                 ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId,
@@ -178,6 +225,10 @@ public class ColocateTableIndex implements Writable {
                 group2Schema.put(groupId, groupSchema);
                 group2ErrMsgs.put(groupId, "");
             }
+            // for global colocate table, dbId is 0, and we need to save the real dbId of the table
+            if (groupId.dbId == 0) {
+                groupId.addTblId2DbId(tbl.getId(), dbId);
+            }
             group2Tables.put(groupId, tbl.getId());
             table2Group.put(tbl.getId(), groupId);
             return groupId;
@@ -252,6 +303,7 @@ public class ColocateTableIndex implements Writable {
             }
 
             GroupId groupId = table2Group.remove(tableId);
+            groupId.removeTblId2DbId(tableId);
             group2Tables.remove(groupId, tableId);
             if (!group2Tables.containsKey(groupId)) {
                 // all tables of this group are removed, remove the group
@@ -514,14 +566,19 @@ public class ColocateTableIndex implements Writable {
                 // remove from old group
                 removeTable(tbl.getId());
             }
-            return addTableToGroup(dbId, tbl, newGroup, assignedGroupId);
+            String fullNewGroupName = GroupId.getFullGroupName(dbId, newGroup);
+            return addTableToGroup(dbId, tbl, fullNewGroupName, assignedGroupId);
         } finally {
             writeUnlock();
         }
     }
 
     public void replayAddTableToGroup(ColocatePersistInfo info) throws MetaNotFoundException {
-        Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(info.getGroupId().dbId);
+        long dbId = info.getGroupId().dbId;
+        if (dbId == 0) {
+            dbId = info.getGroupId().getDbIdByTblId(info.getTableId());
+        }
+        Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
         OlapTable tbl = (OlapTable) db.getTableOrMetaException(info.getTableId(),
                 org.apache.doris.catalog.Table.TableType.OLAP);
         writeLock();
@@ -530,7 +587,8 @@ public class ColocateTableIndex implements Writable {
             for (Map.Entry<Tag, List<List<Long>>> entry : map.entrySet()) {
                 group2BackendsPerBucketSeq.put(info.getGroupId(), entry.getKey(), entry.getValue());
             }
-            addTableToGroup(info.getGroupId().dbId, tbl, tbl.getColocateGroup(), info.getGroupId());
+            String fullGroupName = GroupId.getFullGroupName(dbId, tbl.getColocateGroup());
+            addTableToGroup(dbId, tbl, fullGroupName, info.getGroupId());
         } finally {
             writeUnlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 632ea3daab..579bc51677 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3885,23 +3885,23 @@ public class Env {
     }
 
     // the invoker should keep table's write lock
-    public void modifyTableColocate(Database db, OlapTable table, String colocateGroup, boolean isReplay,
+    public void modifyTableColocate(Database db, OlapTable table, String assignedGroup, boolean isReplay,
             GroupId assignedGroupId)
             throws DdlException {
 
         String oldGroup = table.getColocateGroup();
         GroupId groupId = null;
-        if (!Strings.isNullOrEmpty(colocateGroup)) {
-            String fullGroupName = db.getId() + "_" + colocateGroup;
+        if (!Strings.isNullOrEmpty(assignedGroup)) {
+            String fullAssignedGroupName = GroupId.getFullGroupName(db.getId(), assignedGroup);
             //When the new name is the same as the old name, we return it to prevent npe
             if (!Strings.isNullOrEmpty(oldGroup)) {
-                String oldFullGroupName = db.getId() + "_" + oldGroup;
-                if (oldFullGroupName.equals(fullGroupName)) {
+                String oldFullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup);
+                if (oldFullGroupName.equals(fullAssignedGroupName)) {
                     LOG.warn("modify table[{}] group name same as old group name,skip.", table.getName());
                     return;
                 }
             }
-            ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName);
+            ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullAssignedGroupName);
             if (groupSchema == null) {
                 // user set a new colocate group,
                 // check if all partitions all this table has same buckets num and same replication number
@@ -3938,7 +3938,7 @@ public class Env {
                 backendsPerBucketSeq = table.getArbitraryTabletBucketsSeq();
             }
             // change group after getting backends sequence(if has), in case 'getArbitraryTabletBucketsSeq' failed
-            groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, colocateGroup, assignedGroupId);
+            groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, assignedGroup, assignedGroupId);
 
             if (groupSchema == null) {
                 Preconditions.checkNotNull(backendsPerBucketSeq);
@@ -3948,7 +3948,7 @@ public class Env {
             // set this group as unstable
             colocateTableIndex.markGroupUnstable(groupId, "Colocation group modified by user",
                     false /* edit log is along with modify table log */);
-            table.setColocateGroup(colocateGroup);
+            table.setColocateGroup(assignedGroup);
         } else {
             // unset colocation group
             if (Strings.isNullOrEmpty(oldGroup)) {
@@ -3957,17 +3957,16 @@ public class Env {
             }
 
             // when replayModifyTableColocate, we need the groupId info
-            String fullGroupName = db.getId() + "_" + oldGroup;
+            String fullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup);
             groupId = colocateTableIndex.getGroupSchema(fullGroupName).getGroupId();
-
             colocateTableIndex.removeTable(table.getId());
             table.setColocateGroup(null);
         }
 
         if (!isReplay) {
             Map<String, String> properties = Maps.newHashMapWithExpectedSize(1);
-            properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateGroup);
-            TablePropertyInfo info = new TablePropertyInfo(table.getId(), groupId, properties);
+            properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, assignedGroup);
+            TablePropertyInfo info = new TablePropertyInfo(db.getId(), table.getId(), groupId, properties);
             editLog.logModifyTableColocate(info);
         }
         LOG.info("finished modify table's colocation property. table: {}, is replay: {}", table.getName(), isReplay);
@@ -3975,6 +3974,10 @@ public class Env {
 
     public void replayModifyTableColocate(TablePropertyInfo info) throws MetaNotFoundException {
         long dbId = info.getGroupId().dbId;
+        if (dbId == 0) {
+            dbId = info.getDbId();
+        }
+        Preconditions.checkState(dbId != 0, "replay modify table colocate failed, table id: " + info.getTableId());
         long tableId = info.getTableId();
         Map<String, String> properties = info.getPropertyMap();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 22d08c0718..37030dc088 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -146,11 +146,6 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
         // get all groups
         Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
         for (GroupId groupId : groupIds) {
-            Database db = env.getInternalCatalog().getDbNullable(groupId.dbId);
-            if (db == null) {
-                continue;
-            }
-
             Table<String, Tag, ClusterLoadStatistic> statisticMap = env.getTabletScheduler().getStatisticMap();
             if (statisticMap == null) {
                 continue;
@@ -159,7 +154,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
             ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
             ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
             try {
-                Env.getCurrentSystemInfo().checkReplicaAllocation(db.getClusterName(), replicaAlloc);
+                Env.getCurrentSystemInfo().checkReplicaAllocation(SystemInfoService.DEFAULT_CLUSTER, replicaAlloc);
             } catch (DdlException e) {
                 colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
                 continue;
@@ -168,7 +163,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
 
             for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
                 Tag tag = entry.getKey();
-                ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName(), tag);
+                ClusterLoadStatistic statistic = statisticMap.get(SystemInfoService.DEFAULT_CLUSTER, tag);
                 if (statistic == null) {
                     continue;
                 }
@@ -182,7 +177,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
                         infoService, colocateIndex, groupId, tag);
                 // get all available backends for this group
                 Set<Long> beIdsInOtherTag = colocateIndex.getBackendIdsExceptForTag(groupId, tag);
-                List<Long> availableBeIds = getAvailableBeIds(db.getClusterName(), tag, beIdsInOtherTag, infoService);
+                List<Long> availableBeIds = getAvailableBeIds(SystemInfoService.DEFAULT_CLUSTER, tag, beIdsInOtherTag,
+                        infoService);
                 // try relocate or balance this group for specified tag
                 List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
                 if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex,
@@ -214,11 +210,6 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
         Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
         for (GroupId groupId : groupIds) {
             List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
-            Database db = env.getInternalCatalog().getDbNullable(groupId.dbId);
-            if (db == null) {
-                continue;
-            }
-
             List<Set<Long>> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
             if (backendBucketsSeq.isEmpty()) {
                 continue;
@@ -227,6 +218,14 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
             String unstableReason = null;
             OUT:
             for (Long tableId : tableIds) {
+                long dbId = groupId.dbId;
+                if (dbId == 0) {
+                    dbId = groupId.getDbIdByTblId(tableId);
+                }
+                Database db = env.getInternalCatalog().getDbNullable(dbId);
+                if (db == null) {
+                    continue;
+                }
                 OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
                 if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) {
                     continue;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 1a4d212059..137630b516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -435,6 +435,7 @@ public class PropertyAnalyzer {
         return bfFpp;
     }
 
+    // analyze the colocation properties of table
     public static String analyzeColocate(Map<String, String> properties) throws AnalysisException {
         String colocateGroup = null;
         if (properties != null && properties.containsKey(PROPERTIES_COLOCATE_WITH)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 456eee8cf2..e2be6ff0a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1443,7 +1443,7 @@ public class InternalCatalog implements CatalogIf<Database> {
 
             // check colocation
             if (Env.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
-                String fullGroupName = db.getId() + "_" + olapTable.getColocateGroup();
+                String fullGroupName = GroupId.getFullGroupName(db.getId(), olapTable.getColocateGroup());
                 ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName);
                 Preconditions.checkNotNull(groupSchema);
                 groupSchema.checkDistribution(distributionInfo);
@@ -2066,7 +2066,7 @@ public class InternalCatalog implements CatalogIf<Database> {
                 if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
                     throw new AnalysisException("Random distribution for colocate table is unsupported");
                 }
-                String fullGroupName = db.getId() + "_" + colocateGroup;
+                String fullGroupName = GroupId.getFullGroupName(db.getId(), colocateGroup);
                 ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName);
                 if (groupSchema != null) {
                     // group already exist, check if this table can be added to this group
@@ -2075,7 +2075,7 @@ public class InternalCatalog implements CatalogIf<Database> {
                 }
                 // add table to this group, if group does not exist, create a new one
                 Env.getCurrentColocateIndex()
-                        .addTableToGroup(db.getId(), olapTable, colocateGroup, null /* generate group id inside */);
+                        .addTableToGroup(db.getId(), olapTable, fullGroupName, null /* generate group id inside */);
                 olapTable.setColocateGroup(colocateGroup);
             }
         } catch (AnalysisException e) {
@@ -2277,7 +2277,7 @@ public class InternalCatalog implements CatalogIf<Database> {
 
             if (result.second) {
                 if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
-                    // if this is a colocate join table, its table id is already added to colocate group
+                    // if this is a colocate table, its table id is already added to colocate group
                     // so we should remove the tableId here
                     Env.getCurrentColocateIndex().removeTable(tableId);
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 74e4644fd9..9c08ff555e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -485,8 +485,7 @@ public class JournalEntity implements Writable {
                 break;
             }
             case OperationType.OP_MODIFY_TABLE_COLOCATE: {
-                data = new TablePropertyInfo();
-                ((TablePropertyInfo) data).readFields(in);
+                data = TablePropertyInfo.read(in);
                 isRead = true;
                 break;
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
index 408839c616..459be64605 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
@@ -46,10 +46,6 @@ public class ColocatePersistInfo implements Writable {
     @SerializedName(value = "backendsPerBucketSeq")
     private Map<Tag, List<List<Long>>> backendsPerBucketSeq = Maps.newHashMap();
 
-    public ColocatePersistInfo() {
-
-    }
-
     private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
         this.groupId = groupId;
         this.tableId = tableId;
@@ -74,10 +70,6 @@ public class ColocatePersistInfo implements Writable {
         return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
     }
 
-    public static ColocatePersistInfo createForRemoveTable(long tableId) {
-        return new ColocatePersistInfo(new GroupId(-1, -1), tableId, Maps.newHashMap());
-    }
-
     public static ColocatePersistInfo read(DataInput in) throws IOException {
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, ColocatePersistInfo.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java
index 0efbe65db6..959ccaf9c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TablePropertyInfo.java
@@ -18,10 +18,14 @@
 package org.apache.doris.persist;
 
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -32,15 +36,20 @@ import java.util.Map;
  * PersistInfo for Table properties
  */
 public class TablePropertyInfo implements Writable {
+    @SerializedName(value = "dbId")
+    private long dbId;
+    @SerializedName(value = "tableId")
     private long tableId;
+    @SerializedName(value = "propertyMap")
     private Map<String, String> propertyMap;
+    @SerializedName(value = "groupId")
     private GroupId groupId;
 
-    public TablePropertyInfo() {
-
+    private TablePropertyInfo() {
     }
 
-    public TablePropertyInfo(long tableId, GroupId groupId, Map<String, String> propertyMap) {
+    public TablePropertyInfo(long dbId, long tableId, GroupId groupId, Map<String, String> propertyMap) {
+        this.dbId = dbId;
         this.tableId = tableId;
         this.groupId = groupId;
         this.propertyMap = propertyMap;
@@ -50,6 +59,10 @@ public class TablePropertyInfo implements Writable {
         return propertyMap;
     }
 
+    public long getDbId() {
+        return dbId;
+    }
+
     public long getTableId() {
         return tableId;
     }
@@ -60,22 +73,22 @@ public class TablePropertyInfo implements Writable {
 
     @Override
     public void write(DataOutput out) throws IOException {
-        out.writeLong(tableId);
-        if (groupId == null) {
-            out.writeBoolean(false);
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static TablePropertyInfo read(DataInput in) throws IOException {
+        if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_119) {
+            TablePropertyInfo info = new TablePropertyInfo();
+            info.readFields(in);
+            return info;
         } else {
-            out.writeBoolean(true);
-            groupId.write(out);
-        }
-        int size = propertyMap.size();
-        out.writeInt(size);
-        for (Map.Entry<String, String> kv : propertyMap.entrySet()) {
-            Text.writeString(out, kv.getKey());
-            Text.writeString(out, kv.getValue());
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, TablePropertyInfo.class);
         }
     }
 
-    public void readFields(DataInput in) throws IOException {
+    @Deprecated
+    private void readFields(DataInput in) throws IOException {
         tableId = in.readLong();
         if (in.readBoolean()) {
             groupId = GroupId.read(in);
@@ -102,13 +115,14 @@ public class TablePropertyInfo implements Writable {
 
         TablePropertyInfo info = (TablePropertyInfo) obj;
 
-        return tableId == info.tableId && groupId.equals(info.groupId)
+        return dbId == info.dbId && tableId == info.tableId && groupId.equals(info.groupId)
                 && propertyMap.equals(info.propertyMap);
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
+        sb.append(" db id: ").append(dbId);
         sb.append(" table id: ").append(tableId);
         sb.append(" group id: ").append(groupId);
         sb.append(" propertyMap: ").append(propertyMap);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 0537ec5bab..43e911674a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -592,7 +592,7 @@ public class DistributedPlanner {
                 }
             }
 
-            //3 the join columns should contains all distribute columns to enable colocate join
+            //3 the join columns should contain all distribute columns to enable colocate join
             if (leftJoinColumns.containsAll(leftDistributeColumns)
                     && rightJoinColumns.containsAll(rightDistributeColumns)) {
                 return true;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
index 404ff80aeb..7e0e2baacd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
@@ -129,7 +129,7 @@ public class ColocateTableTest {
         Map<Tag, List<List<Long>>> backendIds = index.getBackendsPerBucketSeq(groupId);
         Assert.assertEquals(1, backendIds.get(Tag.DEFAULT_BACKEND_TAG).get(0).size());
 
-        String fullGroupName = dbId + "_" + groupName;
+        String fullGroupName = GroupId.getFullGroupName(dbId, groupName);
         Assert.assertEquals(tableId, index.getTableIdByGroup(fullGroupName));
         ColocateGroupSchema groupSchema = index.getGroupSchema(fullGroupName);
         Assert.assertNotNull(groupSchema);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
index 7583c4cd69..ceda6e8260 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
@@ -17,66 +17,61 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.analysis.CreateDbStmt;
-import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.catalog.ColocateGroupSchema;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.jmockit.Deencapsulation;
-import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.QueryStatisticsItem;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.utframe.UtFrameUtils;
+import org.apache.doris.utframe.TestWithFeService;
 
 import org.apache.commons.lang.StringUtils;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import java.io.File;
 import java.util.List;
-import java.util.UUID;
 
-public class ColocatePlanTest {
+public class ColocatePlanTest extends TestWithFeService {
     public static final String COLOCATE_ENABLE = "COLOCATE";
-    private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/";
-    private static ConnectContext ctx;
+    private static final String GLOBAL_GROUP = "__global__group1";
+    private static final String GLOBAL_GROUP2 = "__global__group2";
 
-    @BeforeClass
-    public static void setUp() throws Exception {
+    @Override
+    protected void runBeforeAll() throws Exception {
         FeConstants.runningUnitTest = true;
-        UtFrameUtils.createDorisCluster(runningDir, 2);
-        ctx = UtFrameUtils.createDefaultCtx();
-        String createDbStmtStr = "create database db1;";
-        CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
-        Env.getCurrentEnv().createDb(createDbStmt);
-        // create table test_colocate (k1 int ,k2 int, k3 int, k4 int)
-        // distributed by hash(k1, k2) buckets 10
-        // properties ("replication_num" = "2");
-        String createColocateTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) "
+        createDatabase("db1");
+        createTable("create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) "
                 + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
-                + "'colocate_with' = 'group1');";
-        CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx);
-        Env.getCurrentEnv().createTable(createColocateTableStmt);
-        String createTblStmtStr = "create table db1.test(k1 int, k2 int, k3 int, k4 int)"
+                + "'colocate_with' = 'group1');");
+        createTable("create table db1.test(k1 int, k2 int, k3 int, k4 int)"
                 + "partition by range(k1) (partition p1 values less than (\"1\"), partition p2 values less than (\"2\"))"
-                + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')";
-        CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
-        Env.getCurrentEnv().createTable(createTableStmt);
-
-        String createMultiPartitionTableStmt = "create table db1.test_multi_partition(k1 int, k2 int)"
+                + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')");
+        createTable("create table db1.test_multi_partition(k1 int, k2 int)"
                 + "partition by range(k1) (partition p1 values less than(\"1\"), partition p2 values less than (\"2\"))"
-                + "distributed by hash(k2) buckets 10 properties ('replication_num' = '2', 'colocate_with' = 'group2')";
-        CreateTableStmt createMultiTableStmt = (CreateTableStmt) UtFrameUtils
-                .parseAndAnalyzeStmt(createMultiPartitionTableStmt, ctx);
-        Env.getCurrentEnv().createTable(createMultiTableStmt);
+                + "distributed by hash(k2) buckets 10 properties ('replication_num' = '2', 'colocate_with' = 'group2')");
+
+        // global colocate tables
+        createDatabase("db2");
+        createTable("create table db1.test_global_colocate1(k1 varchar(10), k2 int, k3 int, k4 int) "
+                + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+                + "'colocate_with' = '" + GLOBAL_GROUP + "');");
+        createTable("create table db2.test_global_colocate2(k1 varchar(20), k2 int, k3 int) "
+                + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+                + "'colocate_with' = '" + GLOBAL_GROUP + "');");
+        createTable("create table db2.test_global_colocate3(k1 varchar(20), k2 int, k3 date) "
+                + "partition by range(k3) (partition p1 values less than(\"2020-01-01\"), partition p2 values less than (\"2020-02-01\")) "
+                + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+                + "'colocate_with' = '" + GLOBAL_GROUP + "');");
     }
 
-    @AfterClass
-    public static void tearDown() {
-        File file = new File(runningDir);
-        file.delete();
+    @Override
+    protected int backendNum() {
+        return 2;
     }
 
     // without
@@ -84,9 +79,9 @@ public class ColocatePlanTest {
     // 2. join: src data has been redistributed
     @Test
     public void sqlDistributedSmallerThanData1() throws Exception {
-        String sql = "explain select * from (select k1 from db1.test_colocate group by k1) a , db1.test_colocate b "
-                + "where a.k1=b.k1";
-        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        String plan1 = getSQLPlanOrErrorMsg(
+                "explain select * from (select k1 from db1.test_colocate group by k1) a , db1.test_colocate b "
+                        + "where a.k1=b.k1");
         Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
         Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA));
     }
@@ -96,7 +91,7 @@ public class ColocatePlanTest {
     public void sqlDistributedSmallerThanData2() throws Exception {
         String sql = "explain select * from (select k1 from db1.test_colocate group by k1, k2) a , db1.test_colocate b "
                 + "where a.k1=b.k1";
-        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        String plan1 = getSQLPlanOrErrorMsg(sql);
         Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY));
     }
 
@@ -105,9 +100,10 @@ public class ColocatePlanTest {
     // 2. hash columns = agg output columns = distributed columns
     @Test
     public void sqlAggAndJoinSameAsTableMeta() throws Exception {
-        String sql = "explain select * from (select k1, k2 from db1.test_colocate group by k1, k2) a , db1.test_colocate b "
-                + "where a.k1=b.k1 and a.k2=b.k2";
-        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        String sql =
+                "explain select * from (select k1, k2 from db1.test_colocate group by k1, k2) a , db1.test_colocate b "
+                        + "where a.k1=b.k1 and a.k2=b.k2";
+        String plan1 = getSQLPlanOrErrorMsg(sql);
         Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
         Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
     }
@@ -119,7 +115,7 @@ public class ColocatePlanTest {
     public void sqlAggAndJoinMoreThanTableMeta() throws Exception {
         String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , "
                 + "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 and a.k3=b.k3";
-        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        String plan1 = getSQLPlanOrErrorMsg(sql);
         Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
         Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
     }
@@ -131,7 +127,7 @@ public class ColocatePlanTest {
     public void sqlAggMoreThanTableMeta() throws Exception {
         String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , "
                 + "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2";
-        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        String plan1 = getSQLPlanOrErrorMsg(sql);
         Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
         Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
     }
@@ -144,7 +140,7 @@ public class ColocatePlanTest {
     @Test
     public void sqlAggWithNonColocateTable() throws Exception {
         String sql = "explain select k1, k2 from db1.test group by k1, k2";
-        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        String plan1 = getSQLPlanOrErrorMsg(sql);
         Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
         Assert.assertFalse(plan1.contains(COLOCATE_ENABLE));
     }
@@ -156,7 +152,7 @@ public class ColocatePlanTest {
     @Test
     public void sqlAggWithColocateTable() throws Exception {
         String sql = "select k1, k2, count(*) from db1.test_multi_partition where k2 = 1 group by k1, k2";
-        StmtExecutor executor = UtFrameUtils.getSqlStmtExecutor(ctx, sql);
+        StmtExecutor executor = getSqlStmtExecutor(sql);
         Planner planner = executor.planner();
         Coordinator coordinator = Deencapsulation.getField(executor, "coord");
         List<ScanNode> scanNodeList = planner.getScanNodes();
@@ -173,8 +169,9 @@ public class ColocatePlanTest {
 
     @Test
     public void checkColocatePlanFragment() throws Exception {
-        String sql = "select a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;";
-        StmtExecutor executor = UtFrameUtils.getSqlStmtExecutor(ctx, sql);
+        String sql
+                = "select a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;";
+        StmtExecutor executor = getSqlStmtExecutor(sql);
         Planner planner = executor.planner();
         Coordinator coordinator = Deencapsulation.getField(executor, "coord");
         boolean isColocateFragment0 = Deencapsulation.invoke(coordinator, "isColocateFragment",
@@ -190,18 +187,120 @@ public class ColocatePlanTest {
     public void rollupAndMoreThanOneInstanceWithoutColocate() throws Exception {
         String createColocateTblStmtStr = "create table db1.test_colocate_one_backend(k1 int, k2 int, k3 int, k4 int) "
                 + "distributed by hash(k1, k2, k3) buckets 10 properties('replication_num' = '1');";
-        CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx);
-        Env.getCurrentEnv().createTable(createColocateTableStmt);
-
+        createTable(createColocateTblStmtStr);
         String sql = "select a.k1, a.k2, sum(a.k3) "
                 + "from db1.test_colocate_one_backend a join[shuffle] db1.test_colocate_one_backend b on a.k1=b.k1 "
                 + "group by rollup(a.k1, a.k2);";
-        Deencapsulation.setField(ctx.getSessionVariable(), "parallelExecInstanceNum", 2);
-        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        Deencapsulation.setField(connectContext.getSessionVariable(), "parallelExecInstanceNum", 2);
+        String plan1 = getSQLPlanOrErrorMsg(sql);
         Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
         Assert.assertEquals(5, StringUtils.countMatches(plan1, "PLAN FRAGMENT"));
-
     }
 
+    @Test
+    public void testGlobalColocateGroup() throws Exception {
+        Database db1 = Env.getCurrentEnv().getInternalCatalog().getDbNullable("default_cluster:db1");
+        Database db2 = Env.getCurrentEnv().getInternalCatalog().getDbNullable("default_cluster:db2");
+        OlapTable tbl1 = (OlapTable) db1.getTableNullable("test_global_colocate1");
+        OlapTable tbl2 = (OlapTable) db2.getTableNullable("test_global_colocate2");
+        OlapTable tbl3 = (OlapTable) db2.getTableNullable("test_global_colocate3");
+
+        String sql = "explain select * from (select k1, k2 from "
+                + "db1.test_global_colocate1 group by k1, k2) a , db2.test_global_colocate2 b "
+                + "where a.k1=b.k1 and a.k2=b.k2";
+        String plan1 = getSQLPlanOrErrorMsg(sql);
+        Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
+        Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
+        ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+        ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(
+                GroupId.getFullGroupName(1000, GLOBAL_GROUP));
+        Assert.assertNotNull(groupSchema);
+        GroupId groupId = groupSchema.getGroupId();
+        List<Long> tableIds = colocateTableIndex.getAllTableIds(groupId);
+        Assert.assertEquals(3, tableIds.size());
+        Assert.assertTrue(tableIds.contains(tbl1.getId()));
+        Assert.assertTrue(tableIds.contains(tbl2.getId()));
+        Assert.assertTrue(tableIds.contains(tbl3.getId()));
+        Assert.assertEquals(3, groupId.getTblId2DbIdSize());
+        Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
+        Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId()));
+        Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl3.getId()));
+
+        sql = "explain select * from (select k1, k2 from "
+                + "db1.test_global_colocate1 group by k1, k2) a , db2.test_global_colocate3 b "
+                + "where a.k1=b.k1 and a.k2=b.k2";
+        plan1 = getSQLPlanOrErrorMsg(sql);
+        Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
+        Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
+
+        String addPartitionStmt
+                = "alter table db2.test_global_colocate3 add partition p3 values less than (\"2020-03-01\");";
+        alterTableSync(addPartitionStmt);
+
+        try {
+            createTable("create table db1.test_global_colocate4(k1 int, k2 int, k3 int, k4 int) "
+                    + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+                    + "'colocate_with' = '" + GLOBAL_GROUP + "');");
+            Assert.fail();
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.assertTrue(
+                    e.getMessage().contains("Colocate tables distribution columns must have the same data type"));
+            List<Long> tmpTableIds = colocateTableIndex.getAllTableIds(groupId);
+            Assert.assertEquals(3, tmpTableIds.size());
+            Assert.assertTrue(tmpTableIds.contains(tbl1.getId()));
+            Assert.assertTrue(tmpTableIds.contains(tbl2.getId()));
+            Assert.assertTrue(tmpTableIds.contains(tbl3.getId()));
+            Assert.assertEquals(3, groupId.getTblId2DbIdSize());
+            Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
+            Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId()));
+            Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl3.getId()));
+        }
+
+        // modify table's colocate group
+        String modifyStmt = "alter table db2.test_global_colocate3 set ('colocate_with' = '');";
+        alterTableSync(modifyStmt);
+        tableIds = colocateTableIndex.getAllTableIds(groupId);
+        Assert.assertEquals(2, tableIds.size());
+        Assert.assertTrue(tableIds.contains(tbl1.getId()));
+        Assert.assertTrue(tableIds.contains(tbl2.getId()));
+        Assert.assertEquals(2, groupId.getTblId2DbIdSize());
+        Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
+        Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId()));
 
+        // change table's colocate group
+        modifyStmt = "alter table db2.test_global_colocate2 set ('colocate_with' = '" + GLOBAL_GROUP2 + "');";
+        alterTableSync(modifyStmt);
+        tableIds = colocateTableIndex.getAllTableIds(groupId);
+        Assert.assertEquals(1, tableIds.size());
+        Assert.assertTrue(tableIds.contains(tbl1.getId()));
+        Assert.assertEquals(1, groupId.getTblId2DbIdSize());
+        Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
+
+        GroupId groupId2 = colocateTableIndex.getGroupSchema(
+                GroupId.getFullGroupName(1000, GLOBAL_GROUP2)).getGroupId();
+        tableIds = colocateTableIndex.getAllTableIds(groupId2);
+        Assert.assertEquals(1, tableIds.size());
+        Assert.assertTrue(tableIds.contains(tbl2.getId()));
+        Assert.assertEquals(1, groupId2.getTblId2DbIdSize());
+        Assert.assertEquals(db2.getId(), groupId2.getDbIdByTblId(tbl2.getId()));
+
+        // checkpoint
+        // Get currentCatalog first
+        Env currentEnv = Env.getCurrentEnv();
+        // Save real ckptThreadId
+        long ckptThreadId = currentEnv.getCheckpointer().getId();
+        try {
+            // set checkpointThreadId to current thread id, so that when do checkpoint manually here,
+            // the Catalog.isCheckpointThread() will return true.
+            Deencapsulation.setField(Env.class, "checkpointThreadId", Thread.currentThread().getId());
+            currentEnv.getCheckpointer().doCheckpoint();
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        } finally {
+            // Restore the ckptThreadId
+            Deencapsulation.setField(Env.class, "checkpointThreadId", ckptThreadId);
+        }
+    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index c96e460b26..72ac00d60e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -617,6 +617,12 @@ public abstract class TestWithFeService {
         Thread.sleep(100);
     }
 
+    protected void alterTableSync(String sql) throws Exception {
+        AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Env.getCurrentEnv().alterTable(alterTableStmt);
+        Thread.sleep(100);
+    }
+
     protected void createMv(String sql) throws Exception {
         CreateMaterializedViewStmt createMaterializedViewStmt =
                 (CreateMaterializedViewStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
diff --git a/regression-test/data/correctness_p0/test_colocate_join.out b/regression-test/data/correctness_p0/test_colocate_join.out
new file mode 100644
index 0000000000..aa5795a72f
--- /dev/null
+++ b/regression-test/data/correctness_p0/test_colocate_join.out
@@ -0,0 +1,22 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !global1 --
+1	jack	2022-01-01	1	jack	2022-01-01	10
+2	jack1	2022-01-02	2	jack1	2022-01-02	11
+3	jack2	2022-01-03	3	jack2	2022-01-03	12
+4	jack3	2022-02-01	4	jack3	2022-02-01	13
+5	jack4	2022-02-01	5	jack4	2022-02-01	14
+
+-- !global2 --
+1	jack	2022-01-01	1	jack	2022-01-01	10
+2	jack1	2022-01-02	2	jack1	2022-01-02	11
+3	jack2	2022-01-03	3	jack2	2022-01-03	12
+4	jack3	2022-02-01	4	jack3	2022-02-01	13
+5	jack4	2022-02-01	5	jack4	2022-02-01	14
+
+-- !global3 --
+1	jack	2022-01-01	1	jack	2022-01-01	10
+2	jack1	2022-01-02	2	jack1	2022-01-02	11
+3	jack2	2022-01-03	3	jack2	2022-01-03	12
+4	jack3	2022-02-01	4	jack3	2022-02-01	13
+5	jack4	2022-02-01	5	jack4	2022-02-01	14
+
diff --git a/regression-test/suites/correctness_p0/test_colocate_join.groovy b/regression-test/suites/correctness_p0/test_colocate_join.groovy
index e8bd7206b8..e5d7ffcbec 100644
--- a/regression-test/suites/correctness_p0/test_colocate_join.groovy
+++ b/regression-test/suites/correctness_p0/test_colocate_join.groovy
@@ -16,11 +16,22 @@
 // under the License.
 
 suite("test_colocate_join") {
+    def db1 = "test_colocate_join_db1"
+    def db2 = "test_colocate_join_db2"
+    sql """ drop database if exists ${db1}"""
+    sql """ drop database if exists ${db2}"""
+    sql """ create database if not exists ${db1}"""
+    sql """ create database if not exists ${db2}"""
+    sql """ use ${db1}"""
+
     sql """ DROP TABLE IF EXISTS `test_colo1` """
     sql """ DROP TABLE IF EXISTS `test_colo2` """
     sql """ DROP TABLE IF EXISTS `test_colo3` """
     sql """ DROP TABLE IF EXISTS `test_colo4` """
     sql """ DROP TABLE IF EXISTS `test_colo5` """
+    sql """ DROP TABLE IF EXISTS `test_global_tbl1` """
+    sql """ DROP TABLE IF EXISTS `test_global_tbl2` """
+    sql """ DROP TABLE IF EXISTS ${db2}.`test_global_tbl3` """
 
     sql """
         CREATE TABLE `test_colo1` (
@@ -112,6 +123,46 @@ suite("test_colocate_join") {
         );
     """
 
+    sql """
+        create table test_global_tbl1 (
+            id int,
+            name varchar(100),
+            dt date
+        )
+        distributed by hash(id, name) buckets 4
+        properties("colocate_with" = "__global__group1",
+            "replication_num" = "1");
+    """
+
+    sql """
+        create table test_global_tbl2 (
+            id int,
+            name varchar(20),
+            dt date,
+            age bigint
+        )
+        distributed by hash(id, name) buckets 4
+        properties("colocate_with" = "__global__group1",
+            "replication_num" = "1");
+    """
+
+    sql """
+        create table ${db2}.test_global_tbl3 (
+            id int,
+            name varchar(50),
+            dt date,
+            age bigint
+        )
+        partition by range(dt) (
+            partition p1 values less than("2022-02-01"),
+            partition p2 values less than("2022-03-01"),
+            partition p3 values less than("2022-04-01")
+        )
+        distributed by hash(id, name) buckets 4
+        properties("colocate_with" = "__global__group1",
+            "replication_num" = "1");
+    """
+
     sql """insert into test_colo1 values('1','a',12);"""
     sql """insert into test_colo2 values('1','a',12);"""
     sql """insert into test_colo3 values('1','a',12);"""
@@ -193,4 +244,64 @@ suite("test_colocate_join") {
     sql """ DROP TABLE IF EXISTS `tbl1`;"""
     sql """ DROP TABLE IF EXISTS `tbl2`;"""
 
+    sql """insert into ${db1}.test_global_tbl1 values
+            (1,"jack", "2022-01-01"),
+            (2,"jack1", "2022-01-02"),
+            (3,"jack2", "2022-01-03"),
+            (4,"jack3", "2022-02-01"),
+            (5,"jack4", "2022-02-01"),
+            (6, null, "2022-03-01");
+    """
+
+    sql """insert into ${db1}.test_global_tbl2 values
+            (1,"jack", "2022-01-01", 10),
+            (2,"jack1", "2022-01-02", 11),
+            (3,"jack2", "2022-01-03", 12),
+            (4,"jack3", "2022-02-01", 13),
+            (5,"jack4", "2022-02-01", 14),
+            (6,null, "2022-03-01", 15);
+    """
+
+    sql """insert into ${db2}.test_global_tbl3 values
+            (1,"jack", "2022-01-01", 10),
+            (2,"jack1", "2022-01-02", 11),
+            (3,"jack2", "2022-01-03", 12),
+            (4,"jack3", "2022-02-01", 13),
+            (5,"jack4", "2022-02-01", 14),
+            (6,null, "2022-03-01", 15);
+    """
+
+    order_qt_global1 """select * from ${db1}.test_global_tbl1 a join ${db1}.test_global_tbl2 b on a.id = b.id and a.name = b.name """
+    order_qt_global2 """select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name """
+
+    explain {
+        sql ("select * from ${db1}.test_global_tbl1 a join ${db1}.test_global_tbl2 b on a.id = b.id and a.name = b.name")
+        contains "COLOCATE"
+    }
+    explain {
+        sql ("select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name")
+        contains "COLOCATE"
+    }
+    /* add partition */
+    sql """alter table ${db2}.test_global_tbl3 add partition p4 values less than("2022-05-01")"""
+    sql """insert into ${db2}.test_global_tbl3 values (7, "jack7", "2022-04-01", 16)"""
+    order_qt_global3 """select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name """
+    explain {
+        sql ("select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name")
+        contains "COLOCATE"
+    }
+
+    /* modify group: unset */
+    sql """alter table ${db2}.test_global_tbl3 set ("colocate_with" = "");"""
+    explain {
+        sql ("select * from ${db1}.test_global_tbl1 a join ${db2}.test_global_tbl3 b on a.id = b.id and a.name = b.name")
+        contains "Tables are not in the same group"
+    }
+
+    /* modify group: from global to database level */
+    sql """alter table ${db1}.test_global_tbl2 set ("colocate_with" = "db_level_group");"""
+    explain {
+        sql ("select * from ${db1}.test_global_tbl1 a join ${db1}.test_global_tbl2 b on a.id = b.id and a.name = b.name")
+        contains "Tables are not in the same group"
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org