You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2019/09/18 06:16:31 UTC

[incubator-doris] branch master updated: Add a ALTER operation to change distribution type from RANDOM to HASH (#1823)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e70e48c  Add a ALTER operation to change distribution type from RANDOM to HASH (#1823)
e70e48c is described below

commit e70e48c01e28733171bf5dea2a0024994a253d1f
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Wed Sep 18 14:16:26 2019 +0800

    Add a ALTER operation to change distribution type from RANDOM to HASH (#1823)
    
    Random distribution is no longer supported since version 0.9.
    And we need a way to convert the random distribution to hash distribution.
    
        ALTER TABLE db.tbl SET ("distribution_type" = "hash");
---
 .../sql-statements/Data Definition/ALTER TABLE.md  |   7 +-
 .../Data Manipulation/BROKER LOAD.md               |  29 +-
 .../sql-statements/Data Manipulation/LOAD.md       |  40 ---
 .../Data Definition/ALTER TABLE_EN.md              |   4 +
 .../Data Manipulation/BROKER LOAD_EN.md            | 380 +++++++++++++++++++++
 .../sql-statements/Data Manipulation/LOAD_EN.md    |  24 --
 .../Data Manipulation/broker_load_EN.md            | 344 -------------------
 fe/src/main/java/org/apache/doris/alter/Alter.java |   3 +-
 .../apache/doris/alter/SchemaChangeHandler.java    |   9 +-
 .../apache/doris/analysis/AbstractBackupStmt.java  |   3 +-
 .../apache/doris/analysis/CancelBackupStmt.java    |   4 +-
 .../analysis/ModifyTablePropertiesClause.java      |  42 +--
 .../java/org/apache/doris/catalog/Catalog.java     |  28 ++
 .../java/org/apache/doris/catalog/OlapTable.java   |  16 +
 .../java/org/apache/doris/catalog/Partition.java   |   9 +
 .../doris/catalog/RandomDistributionInfo.java      |  13 +
 .../apache/doris/common/util/PropertyAnalyzer.java |   2 +
 .../org/apache/doris/journal/JournalEntity.java    |   5 +
 .../java/org/apache/doris/persist/EditLog.java     |   9 +
 .../org/apache/doris/persist/OperationType.java    |   1 +
 .../java/org/apache/doris/persist/TableInfo.java   |   4 +
 21 files changed, 526 insertions(+), 450 deletions(-)

diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/documentation/cn/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
index e967d37..f61c69e 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Definition/ALTER TABLE.md	
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Definition/ALTER TABLE.md	
@@ -232,7 +232,12 @@
         PROPERTIES ("bloom_filter_columns"="k1,k2,k3");
 
     12. 修改表的Colocate 属性
-        ALTER TABLE example_db.my_table set ("colocate_with"="t1");
+
+        ALTER TABLE example_db.my_table set ("colocate_with" = "t1");
+
+    13. 将表的分桶方式由 Random Distribution 改为 Hash Distribution
+
+        ALTER TABLE example_db.my_table set ("distribution_type" = "hash");
         
     [rename]
     1. 将名为 table1 的表修改为 table2
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index e9210a3..fd1e499 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
@@ -1,7 +1,7 @@
 # BROKER LOAD
 ## description
 
-    Broker load 通过随 Palo 集群一同部署的 broker 进行,访问对应数据源的数据,进行数据导入。
+    Broker load 通过随 Doris 集群一同部署的 broker 进行,访问对应数据源的数据,进行数据导入。
     可以通过 show broker 命令查看已经部署的 broker。
     目前支持以下4种数据源:
 
@@ -316,7 +316,8 @@
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
 
-     8. 导入Parquet文件中数据  指定FORMAT 为parquet, 默认是通过文件后缀判断
+    8. 导入Parquet文件中数据  指定FORMAT 为parquet, 默认是通过文件后缀判断
+
         LOAD LABEL example_db.label9
         (
         DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
@@ -326,7 +327,29 @@
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
 
-     9. 对待导入数据进行过滤,k1 值大于 k2 值的列才能被导入
+    9. 提取文件路径中的分区字段
+
+        如果需要,则会根据表中定义的字段类型解析文件路径中的分区字段(partitioned fields),类似Spark中Partition Discovery的功能
+
+        LOAD LABEL example_db.label10
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
+        INTO TABLE `my_table`
+        FORMAT AS "csv"
+        (k1, k2, k3)
+        COLUMNS FROM PATH AS (city, utc_date)
+        SET (uniq_id = md5sum(k1, city))
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+        hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing目录下包括如下文件:
+
+        [hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]
+
+        则提取文件路径的中的city和utc_date字段
+
+    10. 对待导入数据进行过滤,k1 值大于 k2 值的列才能被导入
+
         LOAD LABEL example_db.label10
         (
         DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md
index 865bfc3..9bde11c 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md	
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md	
@@ -262,46 +262,6 @@
         )
         );
 
-        LOAD LABEL example_db.label8
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, p2)
-        COLUMNS TERMINATED BY ","
-        (k1, k2, tmp_k3, tmp_k4, v1, v2)
-        SET (
-          v1 = hll_hash(tmp_k3),
-          v2 = hll_hash(tmp_k4)
-        )
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-    8. 导入Parquet文件中数据  指定FORMAT 为parquet, 默认是通过文件后缀判断
-        LOAD LABEL example_db.label9
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        FORMAT AS "parquet"
-        (k1, k2, k3)
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); 
-        
-    9. 提取文件路径中的压缩字段
-        如果需要,则会根据表中定义的字段类型解析文件路径中的压缩字段(partitioned fields),类似Spark中Partition Discovery的功能
-        LOAD LABEL example_db.label10
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
-        INTO TABLE `my_table`
-        FORMAT AS "csv"
-        (k1, k2, k3)
-        COLUMNS FROM PATH AS (city, utc_date)
-        SET (uniq_id = md5sum(k1, city))
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-        hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing目录下包括如下文件:[hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]
-        则提取文件路径的中的city和utc_date字段
-
 ## keyword
     LOAD
     
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Definition/ALTER TABLE_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Definition/ALTER TABLE_EN.md
index 6a821d7..35b38c0 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data Definition/ALTER TABLE_EN.md	
+++ b/docs/documentation/en/sql-reference/sql-statements/Data Definition/ALTER TABLE_EN.md	
@@ -225,6 +225,10 @@ PROPERTIES ("bloom_filter_columns"="k1,k2,k3");
 12. Modify the Colocate property of the table
 ALTER TABLE example_db.my_table set ("colocate_with"="t1");
 
+13. Change the Distribution type from Random to Hash
+
+ALTER TABLE example_db.my_table set ("distribution_type" = "hash");
+
 [Rename]
 1. Modify the table named Table 1 to table2
 ALTER TABLE table1 RENAME table2;
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD_EN.md
new file mode 100644
index 0000000..7eefcfa
--- /dev/null
+++ b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD_EN.md	
@@ -0,0 +1,380 @@
+# BROKER LOAD
+## description
+
+    Broker load will load data into Doris via Broker.
+    Use `show broker;` to see the Broker deployed in cluster.
+    
+    Support following data sources:
+
+    1. Baidu HDFS: hdfs for Baidu. Only be used inside Baidu.
+    2. Baidu AFS: afs for Baidu. Only be used inside Baidu.
+    3. Baidu Object Storage(BOS): BOS on Baidu Cloud.
+    4. Apache HDFS.
+
+### Syntax: 
+
+    LOAD LABEL load_label
+    (
+    data_desc1[, data_desc2, ...]
+    )
+    WITH BROKER broker_name
+    [broker_properties]
+    [opt_properties];
+
+    1. load_label
+
+        Unique load label within a database.
+        syntax: 
+        [database_name.]your_label
+     
+    2. data_desc
+
+        To describe the data source. 
+        syntax: 
+            DATA INFILE
+            (
+            "file_path1"[, file_path2, ...]
+            )
+            [NEGATIVE]
+            INTO TABLE `table_name`
+            [PARTITION (p1, p2)]
+            [COLUMNS TERMINATED BY "column_separator"]
+            [FORMAT AS "file_type"]
+            [(column_list)]
+            [SET (k1 = func(k2))]
+            [WHERE predicate]    
+
+        Explain: 
+            file_path: 
+
+            File path. Support wildcard. Must match to file, not directory. 
+
+            PARTITION:
+
+            Data will only be loaded to specified partitions. Data out of partition's range will be filtered. If not specifed, all partitions will be loaded.
+                    
+            NEGATIVE: 
+            
+            If this parameter is specified, it is equivalent to importing a batch of "negative" data to offset the same batch of data loaded before.
+            
+            This parameter applies only to the case where there are value columns and the aggregation type of value columns is only SUM.
+            
+            column_separator: 
+            
+            Used to specify the column separator in the import file. Default is `\t`.
+            If the character is invisible, it needs to be prefixed with `\\x`, using hexadecimal to represent the separator.
+
+            For example, the separator `\x01` of the hive file is specified as `\\ x01`
+            
+            file_type: 
+
+            Used to specify the type of imported file, such as parquet, csv. Default values are determined by the file suffix name. 
+ 
+            column_list: 
+
+            Used to specify the correspondence between columns in the import file and columns in the table.
+
+            When you need to skip a column in the import file, specify it as a column name that does not exist in the table.
+
+            syntax: 
+            (col_name1, col_name2, ...)
+            
+            SET:
+            
+            If this parameter is specified, a column of the source file can be transformed according to a function, and then the transformed result can be loaded into the table. The grammar is `column_name = expression`. Some examples are given to help understand.
+
+            Example 1: There are three columns "c1, c2, c3" in the table. The first two columns in the source file correspond in turn (c1, c2), and the last two columns correspond to c3. Then, column (c1, c2, tmp_c3, tmp_c4) SET (c3 = tmp_c3 + tmp_c4) should be specified.
+
+            Example 2: There are three columns "year, month, day" in the table. There is only one time column in the source file, in the format of "2018-06-01:02:03". Then you can specify columns (tmp_time) set (year = year (tmp_time), month = month (tmp_time), day = day (tmp_time)) to complete the import.
+
+            WHERE:
+          
+            After filtering the transformed data, data that meets where predicates can be loaded. Only column names in tables can be referenced in WHERE statements.
+            
+    3. broker_name
+
+        The name of the Broker used can be viewed through the `show broker` command.
+
+    4. broker_properties
+
+        Used to provide Broker access to data sources. Different brokers, and different access methods, need to provide different information.
+
+        1. Baidu HDFS/AFS
+
+            Access to Baidu's internal hdfs/afs currently only supports simple authentication, which needs to be provided:
+            
+            username: hdfs username
+            password: hdfs password
+
+        2. BOS
+
+            bos_endpoint.
+            bos_accesskey: cloud user's accesskey
+            bos_secret_accesskey: cloud user's secret_accesskey
+        
+        3. Apache HDFS
+
+            Community version of HDFS supports simple authentication, Kerberos authentication, and HA configuration.
+
+            Simple authentication:
+            hadoop.security.authentication = simple (default)
+            username: hdfs username
+            password: hdfs password
+
+            kerberos authentication: 
+            hadoop.security.authentication = kerberos
+            kerberos_principal:  kerberos's principal
+            kerberos_keytab:  path of kerberos's keytab file. This file should be able to access by Broker
+            kerberos_keytab_content: Specify the contents of the KeyTab file in Kerberos after base64 encoding. This option is optional from the kerberos_keytab configuration. 
+
+            namenode HA: 
+            By configuring namenode HA, new namenode can be automatically identified when the namenode is switched
+            dfs.nameservices: hdfs service name,customize,eg: "dfs.nameservices" = "my_ha"
+            dfs.ha.namenodes.xxx: Customize the name of a namenode, separated by commas. XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
+            dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode, where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port"
+            dfs.client.failover.proxy.provider: Specify the provider that client connects to namenode by default: org. apache. hadoop. hdfs. server. namenode. ha. Configured Failover ProxyProvider.
+
+    4. opt_properties
+
+        Used to specify some special parameters. 
+        Syntax: 
+        [PROPERTIES ("key"="value", ...)]
+        
+        You can specify the following parameters: 
+        
+        timout: Specifies the timeout time for the import operation. The default timeout is 4 hours per second.
+
+        max_filter_ratio: Data ratio of maximum tolerance filterable (data irregularity, etc.). Default zero tolerance.
+
+        exc_mem_limit: Sets the upper memory limit for import. The default is 2G, per byte. This is the upper memory limit for a single BE node.
+
+        A load may be distributed over multiple BEs. Let's assume that 1GB data needs up to 5GB of memory for processing at a single node. Suppose 1GB files are distributed over two nodes, then theoretically, each node needs 2.5GB of memory. Then the parameter can be set to 268454560, or 2.5GB.
+
+        strict_mode: Whether the data is strictly restricted. The default is true.
+
+        timezone: Specify time zones for functions affected by time zones, such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation for details. If not specified, use the "Asia/Shanghai" time zone.
+
+    5. Load data format sample
+
+        Integer(TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
+        Float(FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
+        Date(DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03. 
+        (Note: If it's in other date formats, you can use strftime or time_format functions to convert in the import command)
+        
+        String(CHAR/VARCHAR): "I am a student", "a"
+        NULL: \N
+
+## example
+
+    1. Load a batch of data from HDFS, specify timeout and filtering ratio. Use the broker with the inscription my_hdfs_broker. Simple authentication.
+
+        LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "username" = "hdfs_user",
+        "password" = "hdfs_passwd"
+        )
+        PROPERTIES
+        (
+        "timeout" = "3600",
+        "max_filter_ratio" = "0.1"
+        );
+    
+        Where hdfs_host is the host of the namenode and hdfs_port is the fs.defaultFS port (default 9000)
+        
+    2. Load a batch of data from AFS contains multiple files. Import different tables, specify separators, and specify column correspondences.
+
+        LOAD LABEL example_db.label2
+        (
+        DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file1")
+        INTO TABLE `my_table_1`
+        COLUMNS TERMINATED BY ","
+        (k1, k3, k2, v1, v2),
+        DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file2")
+        INTO TABLE `my_table_2`
+        COLUMNS TERMINATED BY "\t"
+        (k1, k2, k3, v2, v1)
+        )
+        WITH BROKER my_afs_broker
+        (
+        "username" = "afs_user",
+        "password" = "afs_passwd"
+        )
+        PROPERTIES
+        (
+        "timeout" = "3600",
+        "max_filter_ratio" = "0.1"
+        );
+        
+
+    3. Load a batch of data from HDFS, specify hive's default delimiter \\x01, and use wildcard * to specify all files in the directory. Use simple authentication and configure namenode HA at the same time
+
+        LOAD LABEL example_db.label3
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
+        INTO TABLE `my_table`
+        COLUMNS TERMINATED BY "\\x01"
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "username" = "hdfs_user",
+        "password" = "hdfs_passwd",
+        "dfs.nameservices" = "my_ha",
+        "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+        "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
+        "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
+        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+        )
+    
+    4. Load a batch of "negative" data from HDFS. Use Kerberos authentication to provide KeyTab file path.
+
+        LOAD LABEL example_db.label4
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
+        NEGATIVE
+        INTO TABLE `my_table`
+        COLUMNS TERMINATED BY "\t"
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "hadoop.security.authentication" = "kerberos",
+        "kerberos_principal"="doris@YOUR.COM",
+        "kerberos_keytab"="/home/palo/palo.keytab"
+        )
+
+    5. Load a batch of data from HDFS, specify partition. At the same time, use Kerberos authentication mode. Provide the KeyTab file content encoded by base64.
+
+        LOAD LABEL example_db.label5
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (k1, k3, k2, v1, v2)
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "hadoop.security.authentication"="kerberos",
+        "kerberos_principal"="doris@YOUR.COM",
+        "kerberos_keytab_content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
+        )
+
+    6. Load a batch of data from BOS, specify partitions, and make some transformations to the columns of the imported files, as follows:
+    
+       Table schema: 
+        k1 varchar(20)
+        k2 int
+
+        Assuming that the data file has only one row of data:
+
+        Adele,1,1
+
+        The columns in the data file correspond to the columns specified in the load statement:
+        
+        k1,tmp_k2,tmp_k3
+
+        transform as: 
+
+        1) k1: unchanged
+        2) k2: sum of tmp_k2 and tmp_k3
+
+        LOAD LABEL example_db.label6
+        (
+        DATA INFILE("bos://my_bucket/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (k1, tmp_k2, tmp_k3)
+        SET (
+          k2 = tmp_k2 + tmp_k3
+        )
+        )
+        WITH BROKER my_bos_broker
+        (
+        "bos_endpoint" = "http://bj.bcebos.com",
+        "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
+        "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
+        )
+    
+    7. Load data into tables containing HLL columns, which can be columns in tables or columns in data
+    
+        If there are three columns in the table (id, v1, v2). The V1 and V2 columns are HLL columns. The imported source file has three columns. Then (column_list) declares that the first column is id, and the second and third columns are temporarily named k1, k2.
+
+        In SET, the HLL column in the table must be specifically declared hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in the original data.        
+
+        LOAD LABEL example_db.label7
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (id, k1, k2)
+        SET (
+          v1 = hll_hash(k1),
+          v2 = hll_hash(k2)
+        )
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+        LOAD LABEL example_db.label8
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (k1, k2, tmp_k3, tmp_k4, v1, v2)
+        SET (
+          v1 = hll_hash(tmp_k3),
+          v2 = hll_hash(tmp_k4)
+        )
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+    8. Data in load Parquet file specifies FORMAT as parquet. By default, it is judged by file suffix.
+
+        LOAD LABEL example_db.label9
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        FORMAT AS "parquet"
+        (k1, k2, k3)
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+    9. Extract partition fields in file paths
+
+        If necessary, partitioned fields in the file path are resolved based on the field type defined in the table, similar to the Partition Discovery function in Spark.
+
+        LOAD LABEL example_db.label10
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
+        INTO TABLE `my_table`
+        FORMAT AS "csv"
+        (k1, k2, k3)
+        COLUMNS FROM PATH AS (city, utc_date)
+        SET (uniq_id = md5sum(k1, city))
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+        Directory `hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing` contains following files:
+         
+        [hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]
+
+        Extract city and utc_date fields in the file path
+
+    10. To filter the load data, columns whose K1 value is greater than K2 value can be imported.
+    
+        LOAD LABEL example_db.label10
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        where k1 > k2
+        );
+     
+## keyword
+
+    BROKER,LOAD
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/LOAD_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/LOAD_EN.md
index 3ca2655..a8bdf0a 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/LOAD_EN.md	
+++ b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/LOAD_EN.md	
@@ -255,30 +255,6 @@ v2 = hll, u hash (k2)
 )
 );
 
-LOAD LABEL example db.label8
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-PARTITION (p1, P2)
-COLUMNS TERMINATED BY ","
-(k1, k2, tmp u k3, tmp u k4, v1, v2)
-SET (
-v1 = hll, u hash (tmp
-v2 = hll, u hash (tmp
-)
-)
-WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-8. Importing data into Parquet file specifies FORMAT as parquet, which is judged by file suffix by default.
-LOAD LABEL example db.label9
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-FORMAT AS "parquet"
-(k1, k2, k3)
-)
-WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
 ## keyword
 LOAD
 
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/broker_load_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/broker_load_EN.md
deleted file mode 100644
index 6c67996..0000000
--- a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/broker_load_EN.md	
+++ /dev/null
@@ -1,344 +0,0 @@
-# BROKER LOAD
-## Description
-
-    Broker load accesses data from corresponding data sources and imports data through broker deployed with Palo cluster.
-    You can view the deployed broker through the show broker command.
-    The following four data sources are currently supported:
-
-    1. Baidu HDFS: Baidu's internal HDFS are limited to Baidu's internal use.
-    2. Baidu AFS: Baidu's internal AFs are limited to Baidu's internal use.
-    3. Baidu Object Storage (BOS): Baidu Object Storage. Only Baidu internal users, public cloud users or other users who can access BOS.
-Four. Apache HDFS
-
-Grammar:
-
-    LOAD LABEL load_label
-    (
-    Date of date of date of entry
-    )
-    WITH BROKER broker_name
-    [broker_properties]
-    [opt_properties];
-
-    1. load label
-    
-        The label of the current imported batch. Unique in a database.
-        Grammar:
-        [database_name.]your_label
-    
-    2. data_desc
-    
-        Used to describe a batch of imported data.
-        Grammar:
-            DATA INFILE
-            (
-            "file_path1"[, file_path2, ...]
-            )
-            [NEGATIVE]
-            INTO TABLE `table_name`
-            [PARTITION (p1, P2)]
-            [COLUMNS TERMINATED BY "column_separator"]
-            [FORMAT AS "file_type"]
-            [(column_list)]
-            [set (k1 = fun (k2)]]
-            [WHERE predicate]    
-    
-        Explain:
-            file_path:
-            
-            File paths can be specified to a file, or * wildcards can be used to specify all files in a directory. Wildcards must match to files, not directories.
-            
-            PARTICIPATION:
-            
-            If this parameter is specified, only the specified partition will be imported, and data outside the imported partition will be filtered out.
-            If not specified, all partitions of the table are imported by default.
-            
-            NEGATIVE:
-            If this parameter is specified, it is equivalent to importing a batch of "negative" data. Used to offset the same batch of data imported before.
-            This parameter applies only to the case where there are value columns and the aggregation type of value columns is SUM only.
-            
-            Column U separator:
-            
-            Used to specify the column separator in the import file. Default tot
-            If the character is invisible, it needs to be prefixed with \x, using hexadecimal to represent the separator.
-            For example, the separator X01 of the hive file is specified as "\ x01"
-            
-            File type:
-            
-            Used to specify the type of imported file, such as parquet, csv. The default value is determined by the file suffix name.
-            
-            column_list:
-            
-            Used to specify the correspondence between columns in the import file and columns in the table.
-            When you need to skip a column in the import file, specify it as a column name that does not exist in the table.
-            Grammar:
-            (col_name1, col_name2, ...)
-            
-            SET:
-            
-            If this parameter is specified, a column of the source file can be transformed according to a function, and then the transformed result can be imported into the table.
-            Example1: table has columns (c1, c2, c3), source file has four columns. The first and second columns in source file map to the c1, c2 and the sum of 3th and 4th map to the c3. So the column mapping is columns (c1,c2,tmp_c3,tmp_c4) SET (c3=tmp_c3+tmp_c4);  
-    
-    3. broker name
-    
-        The name of the broker used can be viewed through the show broker command.
-    
-    4. broker_properties
-    
-        Used to provide information to access data sources through broker. Different brokers, as well as different access methods, need to provide different information.
-        
-        1. HDFS /AFS Baidu
-        
-            Access to Baidu's internal hdfs/afs currently only supports simple authentication, which needs to be provided:
-            Username: HDFS username
-            password -hdfs
-        
-        2. BOS
-        
-            Need to provide:
-            Bos_endpoint: endpoint of BOS
-            Bos_accesskey: Accesskey for public cloud users
-            Bos_secret_access key: secret_access key for public cloud users
-        
-        3. Apache HDFS
-        
-            Community version of HDFS supports simple authentication and Kerberos authentication. And support HA configuration.
-            Simple authentication:
-            hadoop.security.authentication = simple (默认)
-            Username: HDFS username
-            password -hdfs
-            
-            Kerberos authentication:
-            hadoop.security.authentication = kerberos
-            Kerberos_principal: Specifies the principal of Kerberos
-            Kerberos_keytab: Specifies the KeyTab file path for kerberos. This file must be a file on the server where the broker process resides.
-            Kerberos_keytab_content: Specifies the content of the KeyTab file in Kerberos after base64 encoding. This is a choice from the kerberos_keytab configuration.
-            
-            HA code
-            By configuring namenode HA, new namenode can be automatically identified when the namenode is switched
-            Dfs. nameservices: Specify the name of the HDFS service and customize it, such as: "dfs. nameservices" = "my_ha"
-            Dfs.ha.namenodes.xxx: Customize the name of the namenode, with multiple names separated by commas. Where XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
-            Dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode. Where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port"
-            Dfs.client.failover.proxy.provider: Specifies the provider that client connects to namenode by default: org.apache.hadoop.hdfs.server.namenode.ha.Configured Failover ProxyProvider
-    
-    4. opt_properties
-    
-        Used to specify some special parameters.
-        Grammar:
-        [PROPERTIES ("key"="value", ...)]
-        
-        The following parameters can be specified:
-        Timeout: Specifies the timeout time of the import operation. The default timeout is 4 hours. Unit seconds.
-        Max_filter_ratio: The ratio of data that is most tolerant of being filterable (for reasons such as data irregularities). Default zero tolerance.
-        Exec_mem_limit: Sets the upper memory limit for import use. Default is 2G, unit byte. This refers to the upper memory limit of a single BE node.
-        An import may be distributed across multiple BEs. We assume that processing 1GB data at a single node requires up to 5GB of memory. Assuming that a 1GB file is distributed among two nodes, then theoretically, each node needs 2.5GB of memory. Then the parameter can be set to 268454560, or 2.5GB.
-        Strict mode: is there a strict restriction on data? The default is true.
-    
-    5. Import data format sample
-    
-        Integer classes (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1,1000,1234
-        Floating Point Class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, 356
-        
-        (Note: If it's in other date formats, you can use strftime or time_format functions to convert in the import command)
-        字符串类(CHAR/VARCHAR):"I am a student", "a"
-NULL value: N
-
-## example
-
-    1. Import a batch of data from HDFS, specifying the timeout time and filtering ratio. Use the broker with the inscription my_hdfs_broker. Simple authentication.
-    
-        LOAD LABEL example db.label1
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "Username" = "HDFS\\ user"
-        "password" = "hdfs_passwd"
-        )
-        PROPERTIES
-        (
-        Timeout ="3600",
-        "max_filter_ratio" = "0.1"
-        );
-        
-        Where hdfs_host is the host of the namenode and hdfs_port is the fs.defaultFS port (default 9000)
-    
-    2. A batch of data from AFS, including multiple files. Import different tables, specify separators, and specify column correspondences.
-    
-        LOAD LABEL example db.label2
-        (
-        DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data /input /file1")
-        INTO TABLE `my_table_1`
-        COLUMNS TERMINATED BY ","
-        (k1, k3, k2, v1, v2),
-        DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data /input /file2")
-        INTO TABLE `my_table_2`
-        COLUMNS TERMINATED BY "\t"
-        (k1, k2, k3, v2, v1)
-        )
-        WITH BROKER my_afs_broker
-        (
-        "username" ="abu user",
-        "password" = "afs_passwd"
-        )
-        PROPERTIES
-        (
-        Timeout ="3600",
-        "max_filter_ratio" = "0.1"
-        );
-    
-    
-    3. Import a batch of data from HDFS, specify hive's default delimiter x01, and use wildcard * to specify all files in the directory.
-    Use simple authentication and configure namenode HA at the same time
-    
-        LOAD LABEL example db.label3
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
-        INTO TABLE `my_table`
-        COLUMNS TERMINATED BY "\\x01"
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "Username" = "HDFS\\ user"
-        "password" = "hdfs_passwd",
-        "dfs.nameservices" = "my_ha",
-        "dfs.ha.namodes.my -ha" ="we named1, we named2",
-        "dfs.namode.rpc -address.my ha.my name1" ="nn1 guest:rpc port",
-        "dfs.namode.rpc -address.my ha.my name2" ="nn2 guest:rpc port",
-        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
-        )
-    
-    4. Import a batch of "negative" data from HDFS. At the same time, Kerberos authentication is used. Provide KeyTab file path.
-    
-        LOAD LABEL example db.label4
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
-        NEGATIVE
-        INTO TABLE `my_table`
-        COLUMNS TERMINATED BY "\t"
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "hadoop.security.authentication" = "kerberos",
-        "kerberos" principal ="doris @YOUR.COM",
-        "kerberos" keytab ="/home /palo /palo.keytab"
-        )
-    
-    5. Import a batch of data from HDFS and specify partitions. At the same time, Kerberos authentication is used. Provides the KeyTab file content encoded by base64.
-    
-        LOAD LABEL example db.label5
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (k1, k3, k2, v1, v2)
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "hadoop.security.authentication"="kerberos",
-        "kerberos" principal ="doris @YOUR.COM",
-        "kerberos" keytab "content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
-        )
-    
-    6. Import a batch of data from BOS, specify partitions, and make some transformations to the columns of imported files, as follows:
-        The table structure is as follows:
-        K1 date
-        date
-        k3 bigint
-        k4 varchar (20)
-        k5 varchar (64)
-        k6 int
-        
-        Assume that the data file has only one row of data:
-        
-        1537002087,2018-08-09 11:12:13,1537002087,-,1
-        
-        The columns in the data file correspond to the columns specified in the import statement:
-        tmp -u k1, tmp -u k2, tmp u k3, k6, v1
-        
-        The conversion is as follows:
-        
-        1) k1: Transform tmp_k1 timestamp column into datetime type data
-        2) k2: Converting tmp_k2 datetime-type data into date data
-        3) k3: Transform tmp_k3 timestamp column into day-level timestamp
-        4) k4: Specify import default value of 1
-        5) k5: Calculate MD5 values from tmp_k1, tmp_k2, tmp_k3 columns
-        6) k6: Replace the - value in the imported file with 10
-        
-        LOAD LABEL example db.label6
-        (
-        DATA INFILE("bos://my_bucket/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (tmp /u k1, tmp /u k2, tmp /u k3, k6, v1)
-        SET (
-        
-        K2 = Time = UFormat ("% Y -% M -% D% H:% I = S", "% Y -% M -% D", TMP = UK2),
-        k3 = alignment_timestamp("day", tmp_k3),
-        k4 = default_value("1"),
-        K5 = MD5Sum (TMP = UK1, TMP = UK2, TMP = UK3)
-        k6 = replace value ("-", "10")
-        )
-        )
-        WITH BROKER my_bos_broker
-        (
-        "bosu endpoint" ="http://bj.bcebos.com",
-        "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
-        "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
-        )
-    
-    7. Import data into tables containing HLL columns, which can be columns in tables or columns in data
-    
-        If there are three columns in the table (id, v1, v2). Where V1 and V2 columns are HLL columns. The imported source file has three columns. In column_list, it is declared that the first column is ID and the second and third column is K1 and k2, which are temporarily named.
-        In SET, the HLL column in the table must be specifically declared hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in the original data.
-        LOAD LABEL example db.label7
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (id, k1, k2)
-        SET (
-        v1 = hll, u hash (k1),
-        v2 = hll, u hash (k2)
-        )
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-        
-        LOAD LABEL example db.label8
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (k1, k2, tmp u k3, tmp u k4, v1, v2)
-        SET (
-        v1 = hll, u hash (tmp
-        v2 = hll, u hash (tmp
-        )
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-    
-    8. Importing data into Parquet file specifies FORMAT as parquet, which is judged by file suffix by default.
-        LOAD LABEL example db.label9
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        FORMAT AS "parquet"
-        (k1, k2, k3)
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-     9. Filter data by k1>k2 
-        LOAD LABEL example_db.label10
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        where k1 > k2
-        )
-## keyword
-BROKER,LOAD
diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java
index 9c2ad2e..ab443dc 100644
--- a/fe/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/src/main/java/org/apache/doris/alter/Alter.java
@@ -123,8 +123,7 @@ public class Alter {
                     || alterClause instanceof AddColumnsClause
                     || alterClause instanceof DropColumnClause
                     || alterClause instanceof ModifyColumnClause
-                    || alterClause instanceof ReorderColumnsClause
-                    || alterClause instanceof ModifyTablePropertiesClause)
+                    || alterClause instanceof ReorderColumnsClause)
                     && !hasAddRollup && !hasDropRollup && !hasPartition && !hasRename) {
                 hasSchemaChange = true;
             } else if (alterClause instanceof AddRollupClause && !hasSchemaChange && !hasAddRollup && !hasDropRollup
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 35b1618..55d92c0 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -81,8 +81,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.doris.catalog.AggregateType.BITMAP_UNION;
-
 public class SchemaChangeHandler extends AlterHandler {
     private static final Logger LOG = LogManager.getLogger(SchemaChangeHandler.class);
 
@@ -539,7 +537,7 @@ public class SchemaChangeHandler extends AlterHandler {
             throw new DdlException("HLL type column can only be in Aggregation data model table: " + newColName);
         }
 
-        if (newColumn.getAggregationType() == BITMAP_UNION && KeysType.AGG_KEYS != olapTable.getKeysType()) {
+        if (newColumn.getAggregationType() == AggregateType.BITMAP_UNION && KeysType.AGG_KEYS != olapTable.getKeysType()) {
             throw new DdlException("BITMAP_UNION must be used in AGG_KEYS");
         }
 
@@ -1305,7 +1303,10 @@ public class SchemaChangeHandler extends AlterHandler {
                 // so just return after finished handling.
                 if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
                     String colocateGroup = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
-                    Catalog.getInstance().modifyTableColocate(db, olapTable, colocateGroup, false, null);
+                    Catalog.getCurrentCatalog().modifyTableColocate(db, olapTable, colocateGroup, false, null);
+                    return;
+                } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
+                    Catalog.getCurrentCatalog().convertDistributionType(db, olapTable);
                     return;
                 }
             }
diff --git a/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java b/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java
index d33d961..4bb83a1 100644
--- a/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java
@@ -70,8 +70,7 @@ public class AbstractBackupStmt extends DdlStmt {
         // the restore table may be newly created, so we can not judge its privs.
         if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(),
                 labelName.getDbName(), PrivPredicate.LOAD)) {
-            ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED,
-                    ConnectContext.get().getQualifiedUser(), labelName.getDbName());
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "LOAD");
         }
 
         checkAndNormalizeBackupObjs();
diff --git a/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java b/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java
index 721f394..b1de068 100644
--- a/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java
@@ -59,8 +59,8 @@ public class CancelBackupStmt extends CancelStmt {
         }
 
         // check auth
-        if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
-            ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
+        if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, PrivPredicate.LOAD)) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "LOAD");
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index 183251c..2cd7d94 100644
--- a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -17,23 +17,16 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.util.PrintableMap;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.common.util.PropertyAnalyzer;
 
 import java.util.Map;
 
 // clause which is used to modify table properties
 public class ModifyTablePropertiesClause extends AlterClause {
 
-    private static final String KEY_STORAGE_TYPE = "storage_type";
-    private static final String KEY_COLOCATE_WITH = "colocate_with";
-
     private Map<String, String> properties;
 
     public ModifyTablePropertiesClause(Map<String, String> properties) {
@@ -46,31 +39,24 @@ public class ModifyTablePropertiesClause extends AlterClause {
             throw new AnalysisException("Properties is not set");
         }
 
-        if (properties.size() == 1 && properties.containsKey(KEY_COLOCATE_WITH)) {
-            if (Config.disable_colocate_join) {
-                throw new AnalysisException("Colocate table is disabled by Admin");
-            }
-
-            if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(
-                    ConnectContext.get(), ConnectContext.get().getDatabase(), PrivPredicate.ALTER)) {
-                ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                        "ALTER");
-            }
-        } else if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ALTER)) {
-            ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                    "ALTER");
+        if (properties.size() != 1) {
+            throw new AnalysisException("Can only set one table property at a time");
         }
 
-        if (properties.containsKey(KEY_STORAGE_TYPE)) {
-            // if set storage type, we need ADMIN privs.
-            if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
-                ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                                                    "ADMIN");
+        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
+            if (Config.disable_colocate_join) {
+                throw new AnalysisException("Colocate table is disabled by Admin");
             }
-
-            if (!properties.get(KEY_STORAGE_TYPE).equals("column")) {
+        } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE)) {
+            if (!properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).equalsIgnoreCase("column")) {
                 throw new AnalysisException("Can only change storage type to COLUMN");
             }
+        } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
+            if (!properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE).equalsIgnoreCase("hash")) {
+                throw new AnalysisException("Can only change distribution type to HASH");
+            }
+        } else {
+            throw new AnalysisException("Unknown table property: " + properties.keySet());
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index a120dc8..1e835ed 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -6134,5 +6134,33 @@ public class Catalog {
             replica.setBad(backendTabletsInfo.isBad());
         }
     }
+
+    // Convert table's distribution type from random to hash.
+    // random distribution is no longer supported.
+    public void convertDistributionType(Database db, OlapTable tbl) throws DdlException {
+        db.writeLock();
+        try {
+            if (!tbl.convertRandomDistributionToHashDistribution()) {
+                throw new DdlException("Table " + tbl.getName() + " is not random distributed");
+            }
+            TableInfo tableInfo = TableInfo.createForModifyDistribution(db.getId(), tbl.getId());
+            editLog.logModifyDitrubutionType(tableInfo);
+            LOG.info("finished to modify distribution type of table: " + tbl.getName());
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    public void replayConvertDistributionType(TableInfo tableInfo) {
+        Database db = getDb(tableInfo.getDbId());
+        db.writeLock();
+        try {
+            OlapTable tbl = (OlapTable) db.getTable(tableInfo.getTableId());
+            tbl.convertRandomDistributionToHashDistribution();
+            LOG.info("replay modify distribution type of table: " + tbl.getName());
+        } finally {
+            db.writeUnlock();
+        }
+    }
 }
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index 457eeaf..d02f957 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1115,4 +1115,20 @@ public class OlapTable extends Table {
         }
         return keysNum;
     }
+
+    public boolean convertRandomDistributionToHashDistribution() {
+        boolean hasChanged = false;
+        List<Column> baseSchema = indexIdToSchema.get(baseIndexId);
+        if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
+            defaultDistributionInfo = ((RandomDistributionInfo) defaultDistributionInfo).toHashDistributionInfo(baseSchema);
+            hasChanged = true;
+        }
+        
+        for (Partition partition : idToPartition.values()) {
+            if (partition.convertRandomDistributionToHashDistribution(baseSchema)) {
+                hasChanged = true;
+            }
+        }
+        return hasChanged;
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java b/fe/src/main/java/org/apache/doris/catalog/Partition.java
index 286f7fd..d5afe6c 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java
@@ -438,4 +438,13 @@ public class Partition extends MetaObject implements Writable {
 
         return buffer.toString();
     }
+
+    public boolean convertRandomDistributionToHashDistribution(List<Column> baseSchema) {
+        boolean hasChanged = false;
+        if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
+            distributionInfo = ((RandomDistributionInfo) distributionInfo).toHashDistributionInfo(baseSchema);
+            hasChanged = true;
+        }
+        return hasChanged;
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java b/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
index aadf69f..7458449 100644
--- a/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
+++ b/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
@@ -20,9 +20,12 @@ package org.apache.doris.catalog;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.RandomDistributionDesc;
 
+import com.google.common.collect.Lists;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Random partition.
@@ -89,4 +92,14 @@ public class RandomDistributionInfo extends DistributionInfo {
                 && bucketNum == randomDistributionInfo.bucketNum;
     }
 
+    public HashDistributionInfo toHashDistributionInfo(List<Column> baseSchema) {
+        List<Column> keyColumns = Lists.newArrayList();
+        for (Column column : baseSchema) {
+            if (column.isKey()) {
+                keyColumns.add(column);
+            }
+        }
+        HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(bucketNum, keyColumns);
+        return hashDistributionInfo;
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 4dcb157..0a74a70 100644
--- a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -70,6 +70,8 @@ public class PropertyAnalyzer {
     
     public static final String PROPERTIES_TIMEOUT = "timeout";
 
+    public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type";
+
     public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty)
             throws AnalysisException {
         DataProperty dataProperty = oldDataProperty;
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index 06a8515..d12dc07 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -417,6 +417,11 @@ public class JournalEntity implements Writable {
                 needRead = false;
                 break;
             }
+            case OperationType.OP_MODIFY_DISTRIBUTION_TYPE: {
+                data = TableInfo.read(in);
+                needRead = false;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 9fe2049..c508f1b 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -695,6 +695,11 @@ public class EditLog {
                     }
                     break;
                 }
+                case OperationType.OP_MODIFY_DISTRIBUTION_TYPE: {
+                    TableInfo tableInfo = (TableInfo)journal.getData();
+                    catalog.replayConvertDistributionType(tableInfo);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1188,4 +1193,8 @@ public class EditLog {
     public void logAlterJob(AlterJobV2 alterJob) {
         logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
     }
+
+    public void logModifyDitrubutionType(TableInfo tableInfo) {
+        logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo);
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index 5041cbc..c972e04 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -55,6 +55,7 @@ public class OperationType {
     public static final short OP_FINISH_CONSISTENCY_CHECK = 29;
     public static final short OP_RENAME_ROLLUP = 120;
     public static final short OP_ALTER_JOB_V2 = 121;
+    public static final short OP_MODIFY_DISTRIBUTION_TYPE = 122;
 
     // 30~39 130~139 230~239 ...
     // load job for only hadoop load
diff --git a/fe/src/main/java/org/apache/doris/persist/TableInfo.java b/fe/src/main/java/org/apache/doris/persist/TableInfo.java
index bf0af33..8529327 100644
--- a/fe/src/main/java/org/apache/doris/persist/TableInfo.java
+++ b/fe/src/main/java/org/apache/doris/persist/TableInfo.java
@@ -64,6 +64,10 @@ public class TableInfo implements Writable {
         return new TableInfo(dbId, tableId, -1L, partitionId, "", "", newPartitionName);
     }
 
+    public static TableInfo createForModifyDistribution(long dbId, long tableId) {
+        return new TableInfo(dbId, tableId, -1L, -1, "", "", "");
+    }
+
     public long getDbId() {
         return dbId;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org