You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/13 01:52:08 UTC

[incubator-doris] branch master updated: [feature](cold-hot) support s3 resource (#8808)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bca121333e [feature](cold-hot) support s3 resource (#8808)
bca121333e is described below

commit bca121333e40c6f63fbf8405f4dc5addf4ed1606
Author: qiye <ji...@gmail.com>
AuthorDate: Wed Apr 13 09:52:03 2022 +0800

    [feature](cold-hot) support s3 resource (#8808)
    
    Add cold hot support in FE meta, support alter resource DDL in FE
---
 docs/.vuepress/sidebar/en.js                       |   4 +
 docs/.vuepress/sidebar/zh-CN.js                    |   1 +
 .../sql-statements/Administration/ALTER SYSTEM.md  | 217 +++++++--------
 .../Data Definition/ALTER RESOURCE.md              |  48 ++++
 .../sql-statements/Data Definition/ALTER TABLE.md  |   1 +
 .../Data Definition/CREATE RESOURCE.md             | 134 +++++++++
 .../sql-statements/Data Definition/CREATE TABLE.md | 304 +++++++++++----------
 .../Data Definition/DROP RESOURCE.md               |  21 +-
 .../Data Definition/SHOW RESOURCES.md              |  15 +-
 .../{DROP RESOURCE.md => ALTER RESOURCE.md}        |  27 +-
 .../sql-statements/Data Definition/ALTER TABLE.md  |   1 +
 .../Data Definition/CREATE RESOURCE.md             | 101 +++++--
 .../sql-statements/Data Definition/CREATE TABLE.md | 233 +++++++++-------
 .../Data Definition/DROP RESOURCE.md               |  11 +-
 .../Data Definition/SHOW RESOURCES.md              |  18 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   6 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |   4 +
 .../main/java/org/apache/doris/alter/Alter.java    |  29 +-
 .../apache/doris/analysis/AlterResourceStmt.java   |  85 ++++++
 .../doris/analysis/ModifyPartitionClause.java      |  10 +-
 .../analysis/ModifyTablePropertiesClause.java      |   2 +
 .../java/org/apache/doris/catalog/Catalog.java     |  11 +
 .../org/apache/doris/catalog/DataProperty.java     |  43 ++-
 .../apache/doris/catalog/OdbcCatalogResource.java  |  28 ++
 .../java/org/apache/doris/catalog/OlapTable.java   |  15 +
 .../java/org/apache/doris/catalog/Resource.java    |  52 +++-
 .../java/org/apache/doris/catalog/ResourceMgr.java |  75 ++++-
 .../java/org/apache/doris/catalog/S3Resource.java  | 155 +++++++++++
 .../org/apache/doris/catalog/SparkResource.java    |  24 ++
 .../org/apache/doris/catalog/TableProperty.java    |  21 +-
 .../doris/clone/DynamicPartitionScheduler.java     |   2 +-
 .../doris/common/proc/PartitionsProcDir.java       |   3 +
 .../apache/doris/common/util/PropertyAnalyzer.java |  89 +++++-
 .../org/apache/doris/journal/JournalEntity.java    |   3 +-
 .../java/org/apache/doris/persist/EditLog.java     |   9 +
 .../org/apache/doris/persist/OperationType.java    |   1 +
 .../org/apache/doris/persist/gson/GsonUtils.java   |   4 +-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +
 .../java/org/apache/doris/alter/AlterTest.java     | 169 +++++++++++-
 .../doris/analysis/CreateResourceStmtTest.java     |  11 +
 .../org/apache/doris/catalog/DataPropertyTest.java |   5 +-
 .../org/apache/doris/catalog/ResourceMgrTest.java  | 101 +++++--
 .../org/apache/doris/catalog/S3ResourceTest.java   | 195 +++++++++++++
 .../org/apache/doris/clone/DiskRebalanceTest.java  |   3 -
 .../apache/doris/common/PropertyAnalyzerTest.java  |   2 +-
 45 files changed, 1809 insertions(+), 487 deletions(-)

diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index 4edd6a6f73..e28083d851 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -623,6 +623,7 @@ module.exports = [
             initialOpenGroupIndex: -1,
             children: [
               "ALTER DATABASE",
+              "ALTER RESOURCE",
               "ALTER TABLE",
               "ALTER VIEW",
               "BACKUP",
@@ -634,6 +635,7 @@ module.exports = [
               "CREATE INDEX",
               "CREATE MATERIALIZED VIEW",
               "CREATE REPOSITORY",
+              "CREATE RESOURCE",
               "CREATE TABLE LIKE",
               "CREATE TABLE",
               "CREATE VIEW",
@@ -643,6 +645,7 @@ module.exports = [
               "DROP INDEX",
               "DROP MATERIALIZED VIEW",
               "DROP REPOSITORY",
+              "DROP RESOURCE",
               "DROP TABLE",
               "DROP VIEW",
               "HLL",
@@ -651,6 +654,7 @@ module.exports = [
               "REFRESH TABLE",
               "RESTORE",
               "SHOW ENCRYPTKEYS",
+              "SHOW RESOURCES",
               "TRUNCATE TABLE",
               "create-function",
               "drop-function",
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index fcd0cbdb76..83e7f22f9b 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -637,6 +637,7 @@ module.exports = [
             initialOpenGroupIndex: -1,
             children: [
               "ALTER DATABASE",
+              "ALTER RESOURCE",
               "ALTER TABLE",
               "ALTER VIEW",
               "BACKUP",
diff --git a/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md b/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md
index 43f1dc2843..f62ce1ecf5 100644
--- a/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md	
+++ b/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md	
@@ -25,114 +25,117 @@ under the License.
 -->
 
 # ALTER SYSTEM
-## Description
-
-This statement is used to operate on nodes in a system. (Administrator only!)
-Grammar:
-1) Adding nodes (without multi-tenant functionality, add in this way)
-ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
-2) Adding idle nodes (that is, adding BACKEND that does not belong to any cluster)
-ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
-3) Adding nodes to a cluster
-ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...];
-4) Delete nodes
-ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
-5) Node offline
-ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
-6)226;- 21152;-Broker
-ALTER SYSTEM ADD BROKER broker_name "host:port"[,"host:port"...];
-(7) 20943;"23569;" Broker
-ALTER SYSTEM DROP BROKER broker_name "host:port"[,"host:port"...];
-8) Delete all Brokers
-ALTER SYSTEM DROP ALL BROKER broker_name
-9) Set up a Load error hub for centralized display of import error information
-ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES ("key" = "value"[, ...]);
-10) Modify property of BE
-ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
-
-Explain:
-1) Host can be hostname or IP address
-2) heartbeat_port is the heartbeat port of the node
-3) Adding and deleting nodes are synchronous operations. These two operations do not take into account the existing data on the node, the node is directly deleted from the metadata, please use cautiously.
-4) Node offline operations are used to secure offline nodes. This operation is asynchronous. If successful, the node will eventually be removed from the metadata. If it fails, the offline will not be completed.
-5) The offline operation of the node can be cancelled manually. See CANCEL DECOMMISSION for details
-6) Load error hub:
-   Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES.
-   If you need to delete the current load error hub, you can set type to null.
-   1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement.
-
-     Hub of Mysql type needs to specify the following parameters:
-     host: mysql host
-     port: mysql port
-     user: mysql user
-     password: mysql password
-     database mysql database
-     table: mysql table
-
-   2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed
-     Hub of Broker type needs to specify the following parameters:
-     Broker: Name of broker
-     Path: Remote Storage Path
-     Other properties: Other information necessary to access remote storage, such as authentication information.
-
-7) Modify BE node attributes currently supports the following attributes:
-   1. tag.location:Resource tag
-   2. disable_query: Query disabled attribute
-   3. disable_load: Load disabled attribute
-
-## example
-
-1. Add a node
-ALTER SYSTEM ADD BACKEND "host:port";
-
-2. Adding an idle node
-ALTER SYSTEM ADD FREE BACKEND "host:port";
-
-3. Delete two nodes
-ALTER SYSTEM DROP BACKEND "host1:port", "host2:port";
 
-4. offline two nodes
-ALTER SYSTEM DECOMMISSION BACKEND "host1:port", "host2:port";
-
-5. Add two Hdfs Broker
-ALTER SYSTEM ADD BROKER hdfs "host1:port", "host2:port";
-
-6. Add a load error hub of Mysql type
-ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
-("type"= "mysql",
-"host" = "192.168.1.17"
-"port" = "3306",
-"User" = "my" name,
-"password" = "my_passwd",
-"database" = "doris_load",
-"table" = "load_errors"
-);
-
-7. 添加一个 Broker 类型的 load error hub
-ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
-("type"= "broker",
-"Name" = BOS,
-"path" = "bos://backup-cmy/logs",
-"bos_endpoint" ="http://gz.bcebos.com",
-"bos_accesskey" = "069fc278xxxxxx24ddb522",
-"bos_secret_accesskey"="700adb0c6xxxxxx74d59eaa980a"
-);
-
-8. Delete the current load error hub
-ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
-("type"= "null");
-
-9. Modify BE resource tag
-
-ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
-
-10. Modify the query disabled attribute of BE
-
-ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true");
+## Description
 
-11. Modify the load disabled attribute of BE
-       
-ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true"); 
+    This statement is used to operate on nodes in a system. (Administrator only!)
+    
+    Syntax:
+        1) Adding nodes (without multi-tenant functionality, add in this way)
+            ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
+        2) Adding idle nodes (that is, adding BACKEND that does not belong to any cluster)
+            ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
+        3) Adding nodes to a cluster
+            ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...];
+        4) Delete nodes
+            ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
+        5) Node offline
+            ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
+        6) Add Broker
+            ALTER SYSTEM ADD BROKER broker_name "host:port"[,"host:port"...];
+        7) Drop Broker
+            ALTER SYSTEM DROP BROKER broker_name "host:port"[,"host:port"...];
+        8) Delete all Brokers
+            ALTER SYSTEM DROP ALL BROKER broker_name
+        9) Set up a Load error hub for centralized display of import error information
+            ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES ("key" = "value"[, ...]);
+        10) Modify property of BE
+            ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
+
+    Explain:
+        1) Host can be hostname or IP address
+        2) heartbeat_port is the heartbeat port of the node
+        3) Adding and deleting nodes are synchronous operations. These two operations do not take into account the existing data on the node, the node is directly deleted from the metadata, please use cautiously.
+        4) Node offline operations are used to secure offline nodes. This operation is asynchronous. If successful, the node will eventually be removed from the metadata. If it fails, the offline will not be completed.
+        5) The offline operation of the node can be cancelled manually. See CANCEL DECOMMISSION for details
+        6) Load error hub:
+           Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES.
+           If you need to delete the current load error hub, you can set type to null.
+           1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement.
+        
+             Hub of Mysql type needs to specify the following parameters:
+             host: mysql host
+             port: mysql port
+             user: mysql user
+             password: mysql password
+             database mysql database
+             table: mysql table
+        
+           2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed
+             Hub of Broker type needs to specify the following parameters:
+             Broker: Name of broker
+             Path: Remote Storage Path
+             Other properties: Other information necessary to access remote storage, such as authentication information.
+        
+        7) Modify BE node attributes currently supports the following attributes:
+           1. tag.location:Resource tag
+           2. disable_query: Query disabled attribute
+           3. disable_load: Load disabled attribute
+
+## Example
+
+    1. Add a node
+        ALTER SYSTEM ADD BACKEND "host:port";
+    
+    2. Adding an idle node
+        ALTER SYSTEM ADD FREE BACKEND "host:port";
+    
+    3. Delete two nodes
+        ALTER SYSTEM DROP BACKEND "host1:port", "host2:port";
+    
+    4. offline two nodes
+        ALTER SYSTEM DECOMMISSION BACKEND "host1:port", "host2:port";
+    
+    5. Add two Hdfs Broker
+        ALTER SYSTEM ADD BROKER hdfs "host1:port", "host2:port";
+    
+    6. Add a load error hub of Mysql type
+        ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
+        ("type"= "mysql",
+        "host" = "192.168.1.17"
+        "port" = "3306",
+        "User" = "my" name,
+        "password" = "my_passwd",
+        "database" = "doris_load",
+        "table" = "load_errors"
+        );
+    
+    7. 添加一个 Broker 类型的 load error hub
+        ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
+        ("type"= "broker",
+        "Name" = BOS,
+        "path" = "bos://backup-cmy/logs",
+        "bos_endpoint" ="http://gz.bcebos.com",
+        "bos_accesskey" = "069fc278xxxxxx24ddb522",
+        "bos_secret_accesskey"="700adb0c6xxxxxx74d59eaa980a"
+        );
+    
+    8. Delete the current load error hub
+        ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
+        ("type"= "null");
+    
+    9. Modify BE resource tag
+    
+        ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
+    
+    10. Modify the query disabled attribute of BE
+    
+        ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true");
+    
+    11. Modify the load disabled attribute of BE
+           
+        ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true"); 
 
 ## keyword
-AGE,SYSTEM,BACKGROUND,BROKER,FREE
+
+    AGE, SYSTEM, BACKGROUND, BROKER, FREE
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/ALTER RESOURCE.md b/docs/en/sql-reference/sql-statements/Data Definition/ALTER RESOURCE.md
new file mode 100644
index 0000000000..1d1361cfa4
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Data Definition/ALTER RESOURCE.md	
@@ -0,0 +1,48 @@
+---
+{
+"title": "ALTER RESOURCE",
+"language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ALTER RESOURCE
+
+## Description
+
+    This statement is used to modify an existing resource. Only the root or admin user can modify resources.
+    Syntax:
+        ALTER RESOURCE 'resource_name'
+        PROPERTIES ("key"="value", ...);
+
+    Note: The resource type does not support modification.
+
+## Example
+
+    1. Modify the working directory of the Spark resource named spark0:
+        ALTER RESOURCE 'spark0' PROPERTIES ("working_dir" = "hdfs://127.0.0.1:10000/tmp/doris_new");
+
+    2. Modify the maximum number of connections to the S3 resource named remote_s3:
+        ALTER RESOURCE 'remote_s3' PROPERTIES ("s3_max_connections" = "100");
+
+## keyword
+
+    ALTER, RESOURCE
\ No newline at end of file
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
index 3cb5f5304a..ad99e6d5a9 100644
--- a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md	
+++ b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md	
@@ -71,6 +71,7 @@ under the License.
             1) The following attributes of the modified partition are currently supported.
                 - storage_medium
                 - storage_cooldown_time
+                - remote_storage_cooldown_time
                 - replication_num 
                 — in_memory
             2) For single-partition tables, partition_name is the same as the table name.
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE RESOURCE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE RESOURCE.md
new file mode 100644
index 0000000000..177454e70f
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE RESOURCE.md	
@@ -0,0 +1,134 @@
+---
+{
+    "title": "CREATE RESOURCE",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# CREATE RESOURCE
+
+## Description
+
+    This statement is used to create a resource. Only the root or admin user can create resources. Currently supports Spark, ODBC, S3 external resources.
+    In the future, other external resources may be added to Doris for use, such as Spark/GPU for query, HDFS/S3 for external storage, MapReduce for ETL, etc.
+
+    Syntax:
+         CREATE [EXTERNAL] RESOURCE "resource_name"
+         PROPERTIES ("key"="value", ...);
+            
+    Explanation:
+        1. The type of resource needs to be specified in PROPERTIES "type" = "[spark|odbc_catalog|s3]", currently supports spark, odbc_catalog, s3.
+        2. The PROPERTIES varies according to the resource type, see the example for details.
+
+## Example
+
+    1. Create a Spark resource named spark0 in yarn cluster mode.
+
+    ````
+        CREATE EXTERNAL RESOURCE "spark0"
+        PROPERTIES
+        (
+          "type" = "spark",
+          "spark.master" = "yarn",
+          "spark.submit.deployMode" = "cluster",
+          "spark.jars" = "xxx.jar,yyy.jar",
+          "spark.files" = "/tmp/aaa,/tmp/bbb",
+          "spark.executor.memory" = "1g",
+          "spark.yarn.queue" = "queue0",
+          "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
+          "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
+          "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
+          "broker" = "broker0",
+          "broker.username" = "user0",
+          "broker.password" = "password0"
+        );
+    ````
+                                                                                                                                                                                                              
+    Spark related parameters are as follows:
+    - spark.master: Required, currently supports yarn, spark://host:port.
+    - spark.submit.deployMode: The deployment mode of the Spark program, required, supports both cluster and client.
+    - spark.hadoop.yarn.resourcemanager.address: Required when master is yarn.
+    - spark.hadoop.fs.defaultFS: Required when master is yarn.
+    - Other parameters are optional, refer to http://spark.apache.org/docs/latest/configuration.html
+    
+    Working_dir and broker need to be specified when Spark is used for ETL. described as follows:
+        working_dir: The directory used by the ETL. Required when spark is used as an ETL resource. For example: hdfs://host:port/tmp/doris.
+        broker: broker name. Required when spark is used as an ETL resource. Configuration needs to be done in advance using the `ALTER SYSTEM ADD BROKER` command.
+        broker.property_key: The authentication information that the broker needs to specify when reading the intermediate file generated by ETL.
+
+    2. Create an ODBC resource
+
+    ````
+        CREATE EXTERNAL RESOURCE `oracle_odbc`
+        PROPERTIES (
+        "type" = "odbc_catalog",
+        "host" = "192.168.0.1",
+        "port" = "8086",
+        "user" = "test",
+        "password" = "test",
+        "database" = "test",
+        "odbc_type" = "oracle",
+        "driver" = "Oracle 19 ODBC driver"
+        );
+    ````
+
+    The relevant parameters of ODBC are as follows:
+    - hosts: IP address of the external database
+    - driver: The driver name of the ODBC appearance, which must be the same as the Driver name in be/conf/odbcinst.ini.
+    - odbc_type: the type of the external database, currently supports oracle, mysql, postgresql
+    - user: username of the foreign database
+    - password: the password information of the corresponding user
+
+    3. Create S3 resource
+    
+    ````
+    CREATE RESOURCE "remote_s3"
+    PROPERTIES
+    (
+    "type" = "s3",
+    "s3_endpoint" = "http://bj.s3.com",
+    "s3_region" = "bj",
+    "s3_root_path" = "/path/to/root",
+    "s3_access_key" = "bbb",
+    "s3_secret_key" = "aaaa",
+    "s3_max_connections" = "50",
+    "s3_request_timeout_ms" = "3000",
+    "s3_connection_timeout_ms" = "1000"
+    );
+    ````
+
+    S3 related parameters are as follows:
+    - required
+        - s3_endpoint: s3 endpoint
+        - s3_region: s3 region
+        - s3_root_path: s3 root directory
+        - s3_access_key: s3 access key
+        - s3_secret_key: s3 secret key
+    - optional
+        - s3_max_connections: the maximum number of s3 connections, the default is 50
+        - s3_request_timeout_ms: s3 request timeout, in milliseconds, the default is 3000
+        - s3_connection_timeout_ms: s3 connection timeout, in milliseconds, the default is 1000
+
+
+## keyword
+
+    CREATE, RESOURCE
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md
index e89cbb7d2e..88b843ddfe 100644
--- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md	
+++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md	
@@ -297,18 +297,24 @@ Syntax:
         PROPERTIES (
             "storage_medium" = "[SSD|HDD]",
             ["storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
+            ["remote_storage_resource" = "xxx"],
+            ["remote_storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
             ["replication_num" = "3"],
-			["replication_allocation" = "xxx"]
+            ["replication_allocation" = "xxx"]
             )
         ```
     
-        storage_medium:         SSD or HDD, The default initial storage media can be specified by `default_storage_medium= XXX` in the fe configuration file `fe.conf`, or, if not, by default, HDD.
-                                Note: when FE configuration 'enable_strict_storage_medium_check' is' True ', if the corresponding storage medium is not set in the cluster, the construction clause 'Failed to find enough host in all backends with storage medium is SSD|HDD'.
-        storage_cooldown_time:  If storage_medium is SSD, data will be automatically moved to HDD   when timeout.
-                                Default is 30 days.
-                                Format: "yyyy-MM-dd HH:mm:ss"
-        replication_num:        Replication number of a partition. Default is 3.
-        replication_allocation:     Specify the distribution of replicas according to the resource tag.
+        storage_medium:                SSD or HDD, The default initial storage media can be specified by `default_storage_medium= XXX` in the fe configuration file `fe.conf`, or, if not, by default, HDD.
+                                       Note: when FE configuration 'enable_strict_storage_medium_check' is' True ', if the corresponding storage medium is not set in the cluster, the construction clause 'Failed to find enough host in all backends with storage medium is SSD|HDD'.
+        storage_cooldown_time:         If storage_medium is SSD, data will be automatically moved to HDD   when timeout.
+                                       Default is 30 days.
+                                       Format: "yyyy-MM-dd HH:mm:ss"
+        remote_storage_resource:       The remote storage resource name, which needs to be used in conjunction with the storage_cold_medium parameter.
+        remote_storage_cooldown_time:  Used in conjunction with remote_storage_resource. Indicates the expiration time of the partition stored locally.
+                                       Does not expire by default. Must be later than storage_cooldown_time if used with it.
+                                       The format is: "yyyy-MM-dd HH:mm:ss"
+        replication_num:               Replication number of a partition. Default is 3.
+        replication_allocation:        Specify the distribution of replicas according to the resource tag.
 
         If table is not range partitions. This property takes on Table level. Or it will takes on   Partition level.
         User can specify different properties for different partition by `ADD PARTITION` or     `MODIFY PARTITION` statements.
@@ -353,7 +359,7 @@ Syntax:
        dynamic_partition.reserved_history_periods: Used to specify the range of reserved history periods
        
        ```
-    5)  You can create multiple Rollups in bulk when building a table
+    5) You can create multiple Rollups in bulk when building a table
     grammar:
     ```
       ROLLUP (rollup_name (column_name1, column_name2, ...)
@@ -405,68 +411,89 @@ Syntax:
     "storage_cooldown_time" = "2015-06-04 00:00:00"
     );
     ```
-
-3. Create an olap table, with range partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
-
-1) LESS THAN
-
+   
+3. Create an olap table, distributed by hash, with aggregation type. Also set storage medium and cooldown time.
+   Setting up remote storage resource and cold data storage media.
     ```
-    CREATE TABLE example_db.table_range
+    CREATE TABLE example_db.table_hash
     (
-    k1 DATE,
-    k2 INT,
-    k3 SMALLINT,
-    v1 VARCHAR(2048),
-    v2 DATETIME DEFAULT "2014-02-04 15:36:00"
+    k1 BIGINT,
+    k2 LARGEINT,
+    v1 VARCHAR(2048) REPLACE,
+    v2 SMALLINT SUM DEFAULT "10"
     )
     ENGINE=olap
-    DUPLICATE KEY(k1, k2, k3)
-    PARTITION BY RANGE (k1)
-    (
-    PARTITION p1 VALUES LESS THAN ("2014-01-01"),
-    PARTITION p2 VALUES LESS THAN ("2014-06-01"),
-    PARTITION p3 VALUES LESS THAN ("2014-12-01")
-    )
-    DISTRIBUTED BY HASH(k2) BUCKETS 32
+    AGGREGATE KEY(k1, k2)
+    DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
     PROPERTIES(
-    "storage_medium" = "SSD", "storage_cooldown_time" = "2015-06-04 00:00:00"
+    "storage_medium" = "SSD",
+    "storage_cooldown_time" = "2015-06-04 00:00:00",
+    "remote_storage_resource" = "remote_s3",
+    "remote_storage_cooldown_time" = "2015-12-04 00:00:00"
     );
-    ```
+   ```
+
+4. Create an olap table, with range partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
+
+   1) LESS THAN
+
+       ```
+       CREATE TABLE example_db.table_range
+       (
+       k1 DATE,
+       k2 INT,
+       k3 SMALLINT,
+       v1 VARCHAR(2048),
+       v2 DATETIME DEFAULT "2014-02-04 15:36:00"
+       )
+       ENGINE=olap
+       DUPLICATE KEY(k1, k2, k3)
+       PARTITION BY RANGE (k1)
+       (
+       PARTITION p1 VALUES LESS THAN ("2014-01-01"),
+       PARTITION p2 VALUES LESS THAN ("2014-06-01"),
+       PARTITION p3 VALUES LESS THAN ("2014-12-01")
+       )
+       DISTRIBUTED BY HASH(k2) BUCKETS 32
+       PROPERTIES(
+       "storage_medium" = "SSD", "storage_cooldown_time" = "2015-06-04 00:00:00"
+       );
+       ```
     
-    Explain:
-    This statement will create 3 partitions:
+       Explain:
+       This statement will create 3 partitions:
     
-    ```
-    ( {    MIN     },   {"2014-01-01"} )
-    [ {"2014-01-01"},   {"2014-06-01"} )
-    [ {"2014-06-01"},   {"2014-12-01"} )
-    ```
+       ```
+       ( {    MIN     },   {"2014-01-01"} )
+       [ {"2014-01-01"},   {"2014-06-01"} )
+       [ {"2014-06-01"},   {"2014-12-01"} )
+       ```
     
-    Data outside these ranges will not be loaded.
+       Data outside these ranges will not be loaded.
 
-2) Fixed Range
-    ```
-    CREATE TABLE table_range
-    (
-    k1 DATE,
-    k2 INT,
-    k3 SMALLINT,
-    v1 VARCHAR(2048),
-    v2 DATETIME DEFAULT "2014-02-04 15:36:00"
-    )
-    ENGINE=olap
-    DUPLICATE KEY(k1, k2, k3)
-    PARTITION BY RANGE (k1, k2, k3)
-    (
-    PARTITION p1 VALUES [("2014-01-01", "10", "200"), ("2014-01-01", "20", "300")),
-    PARTITION p2 VALUES [("2014-06-01", "100", "200"), ("2014-07-01", "100", "300"))
-    )
-    DISTRIBUTED BY HASH(k2) BUCKETS 32
-    PROPERTIES(
-    "storage_medium" = "SSD"
-    );
-    ```
-4. Create an olap table, with list partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
+   2) Fixed Range
+       ```
+       CREATE TABLE table_range
+       (
+       k1 DATE,
+       k2 INT,
+       k3 SMALLINT,
+       v1 VARCHAR(2048),
+       v2 DATETIME DEFAULT "2014-02-04 15:36:00"
+       )
+       ENGINE=olap
+       DUPLICATE KEY(k1, k2, k3)
+       PARTITION BY RANGE (k1, k2, k3)
+       (
+       PARTITION p1 VALUES [("2014-01-01", "10", "200"), ("2014-01-01", "20", "300")),
+       PARTITION p2 VALUES [("2014-06-01", "100", "200"), ("2014-07-01", "100", "300"))
+       )
+       DISTRIBUTED BY HASH(k2) BUCKETS 32
+       PROPERTIES(
+       "storage_medium" = "SSD"
+       );
+       ```
+5. Create an olap table, with list partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
 
     1) Single column partition
 
@@ -540,9 +567,9 @@ Syntax:
 
     Data that is not within these partition enumeration values will be filtered as illegal data
 
-5. Create a mysql table
-   5.1 Create MySQL table directly from external table information
-```
+6. Create a mysql table
+    6.1 Create MySQL table directly from external table information
+    ```
     CREATE EXTERNAL TABLE example_db.table_mysql
     (
     k1 DATE,
@@ -561,21 +588,20 @@ Syntax:
     "database" = "mysql_db_test",
     "table" = "mysql_table_test"
     )
-```
+    ```
 
-   5.2 Create MySQL table with external ODBC catalog resource
-```
-   CREATE EXTERNAL RESOURCE "mysql_resource" 
-   PROPERTIES
-   (
-     "type" = "odbc_catalog",
-     "user" = "mysql_user",
-     "password" = "mysql_passwd",
-     "host" = "127.0.0.1",
-     "port" = "8239"			
-   );
-```
-```
+    6.2 Create MySQL table with external ODBC catalog resource
+    ```
+    CREATE EXTERNAL RESOURCE "mysql_resource" 
+    PROPERTIES
+    (
+      "type" = "odbc_catalog",
+      "user" = "mysql_user",
+      "password" = "mysql_passwd",
+      "host" = "127.0.0.1",
+      "port" = "8239"			
+    );
+   
     CREATE EXTERNAL TABLE example_db.table_mysql
     (
     k1 DATE,
@@ -590,10 +616,10 @@ Syntax:
     "odbc_catalog_resource" = "mysql_resource",
     "database" = "mysql_db_test",
     "table" = "mysql_table_test"
-    )
-```
+    );
+    ```
 
-6. Create a broker table, with file on HDFS, line delimit by "|", column separated by "\n"
+7. Create a broker table, with file on HDFS, line delimit by "|", column separated by "\n"
 
     ```
     CREATE EXTERNAL TABLE example_db.table_broker (
@@ -616,7 +642,7 @@ Syntax:
     );
     ```
 
-7. Create table will HLL column
+8. Create table will HLL column
 
     ```
     CREATE TABLE example_db.example_table
@@ -631,7 +657,7 @@ Syntax:
     DISTRIBUTED BY HASH(k1) BUCKETS 32;
     ```
 
-8. Create a table will BITMAP_UNION column
+9. Create a table will BITMAP_UNION column
 
     ```
     CREATE TABLE example_db.example_table
@@ -645,21 +671,21 @@ Syntax:
     AGGREGATE KEY(k1, k2)
     DISTRIBUTED BY HASH(k1) BUCKETS 32;
     ```
-9. Create a table with QUANTILE_UNION column (the origin value of **v1** and **v2** columns must be **numeric** types)
+10. Create a table with QUANTILE_UNION column (the origin value of **v1** and **v2** columns must be **numeric** types)
     
-    ```
-    CREATE TABLE example_db.example_table
-    (
-    k1 TINYINT,
-    k2 DECIMAL(10, 2) DEFAULT "10.5",
-    v1 QUANTILE_STATE QUANTILE_UNION,
-    v2 QUANTILE_STATE QUANTILE_UNION
-    )
-    ENGINE=olap
-    AGGREGATE KEY(k1, k2)
-    DISTRIBUTED BY HASH(k1) BUCKETS 32;
-    ```
-10. Create 2 colocate join table.
+     ```
+     CREATE TABLE example_db.example_table
+     (
+     k1 TINYINT,
+     k2 DECIMAL(10, 2) DEFAULT "10.5",
+     v1 QUANTILE_STATE QUANTILE_UNION,
+     v2 QUANTILE_STATE QUANTILE_UNION
+     )
+     ENGINE=olap
+     AGGREGATE KEY(k1, k2)
+     DISTRIBUTED BY HASH(k1) BUCKETS 32;
+     ```
+11. Create 2 colocate join table.
 
     ```
     CREATE TABLE `t1` (
@@ -682,7 +708,7 @@ Syntax:
     );
     ```
 
-11. Create a broker table, with file on BOS.
+12. Create a broker table, with file on BOS.
 
     ```
     CREATE EXTERNAL TABLE example_db.table_broker (
@@ -700,7 +726,7 @@ Syntax:
     );
     ```
 
-12. Create a table with a bitmap index 
+13. Create a table with a bitmap index 
 
     ```
     CREATE TABLE example_db.table_hash
@@ -717,7 +743,7 @@ Syntax:
     DISTRIBUTED BY HASH(k1) BUCKETS 32;
     ```
     
-13. Create a dynamic partitioning table (dynamic partitioning needs to be enabled in FE configuration), which creates partitions 3 days in advance every day. For example, if today is' 2020-01-08 ', partitions named 'p20200108', 'p20200109', 'p20200110', 'p20200111' will be created.
+14. Create a dynamic partitioning table (dynamic partitioning needs to be enabled in FE configuration), which creates partitions 3 days in advance every day. For example, if today is' 2020-01-08 ', partitions named 'p20200108', 'p20200109', 'p20200110', 'p20200111' will be created.
 
     ```
     [types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; )
@@ -726,29 +752,29 @@ Syntax:
     [types: [DATE]; keys: [2020-01-11]; ‥types: [DATE]; keys: [2020-01-12]; )
     ```
     
-     ```
-        CREATE TABLE example_db.dynamic_partition
-        (
-        k1 DATE,
-        k2 INT,
-        k3 SMALLINT,
-        v1 VARCHAR(2048),
-        v2 DATETIME DEFAULT "2014-02-04 15:36:00"
-        )
-        ENGINE=olap
-        DUPLICATE KEY(k1, k2, k3)
-        PARTITION BY RANGE (k1) ()
-        DISTRIBUTED BY HASH(k2) BUCKETS 32
-        PROPERTIES(
-        "storage_medium" = "SSD",
-        "dynamic_partition.time_unit" = "DAY",
-        "dynamic_partition.end" = "3",
-        "dynamic_partition.prefix" = "p",
-        "dynamic_partition.buckets" = "32"
-         );
-     ```
-14. Create a table with rollup index
-```
+    ```
+    CREATE TABLE example_db.dynamic_partition
+    (
+    k1 DATE,
+    k2 INT,
+    k3 SMALLINT,
+    v1 VARCHAR(2048),
+    v2 DATETIME DEFAULT "2014-02-04 15:36:00"
+    )
+    ENGINE=olap
+    DUPLICATE KEY(k1, k2, k3)
+    PARTITION BY RANGE (k1) ()
+    DISTRIBUTED BY HASH(k2) BUCKETS 32
+    PROPERTIES(
+    "storage_medium" = "SSD",
+    "dynamic_partition.time_unit" = "DAY",
+    "dynamic_partition.end" = "3",
+    "dynamic_partition.prefix" = "p",
+    "dynamic_partition.buckets" = "32"
+    );
+    ```
+15. Create a table with rollup index
+    ```
     CREATE TABLE example_db.rolup_index_table
     (
         event_day DATE,
@@ -765,11 +791,11 @@ Syntax:
     r3(event_day)
     )
     PROPERTIES("replication_num" = "3");
-```
+    ```
 
-15. Create a inmemory table:
+16. Create a inmemory table:
 
-```
+    ```
     CREATE TABLE example_db.table_hash
     (
     k1 TINYINT,
@@ -783,10 +809,10 @@ Syntax:
     COMMENT "my first doris table"
     DISTRIBUTED BY HASH(k1) BUCKETS 32
     PROPERTIES ("in_memory"="true");
-```
+    ```
 
-16. Create a hive external table
-```
+17. Create a hive external table
+    ```
     CREATE TABLE example_db.table_hive
     (
       k1 TINYINT,
@@ -800,11 +826,11 @@ Syntax:
       "table" = "hive_table_name",
       "hive.metastore.uris" = "thrift://127.0.0.1:9083"
     );
-```
+    ```
 
-17. Specify the replica distribution of the table through replication_allocation
+18. Specify the replica distribution of the table through replication_allocation
 
-```	
+    ```	
     CREATE TABLE example_db.table_hash
     (
     k1 TINYINT,
@@ -812,9 +838,9 @@ Syntax:
     )
     DISTRIBUTED BY HASH(k1) BUCKETS 32
     PROPERTIES (
-		"replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
-	);
-
+        "replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
+    );
+    
     CREATE TABLE example_db.dynamic_partition
     (
     k1 DATE,
@@ -833,11 +859,11 @@ Syntax:
     "dynamic_partition.buckets" = "32",
     "dynamic_partition."replication_allocation" = "tag.location.group_a:3"
      );
-```
+    ```
 
-17. Create an Iceberg external table
+19. Create an Iceberg external table
 
-```
+    ```
     CREATE TABLE example_db.t_iceberg 
     ENGINE=ICEBERG
     PROPERTIES (
@@ -846,7 +872,7 @@ Syntax:
     "iceberg.hive.metastore.uris"  =  "thrift://127.0.0.1:9083",
     "iceberg.catalog.type"  =  "HIVE_CATALOG"
     );
-```
+    ```
 
 ## keyword
 
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md b/docs/en/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md
similarity index 76%
copy from docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md
copy to docs/en/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md
index 07d87021fd..66342cada9 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md	
+++ b/docs/en/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md	
@@ -1,7 +1,7 @@
 ---
 {
     "title": "DROP RESOURCE",
-    "language": "zh-CN"
+    "language": "en"
 }
 ---
 
@@ -25,15 +25,22 @@ under the License.
 -->
 
 # DROP RESOURCE
-## description
-    该语句用于删除一个已有的资源。仅 root 或 admin 用户可以删除资源。
-    语法:
+
+## Description
+
+    This statement is used to delete an existing resource. Only the root or admin user can delete resources.
+    
+    Syntax:
         DROP RESOURCE 'resource_name'
 
-## example
-    1. 删除名为 spark0 的 Spark 资源:
+    Note: ODBC/S3 resources that are in use cannot be deleted.
+
+## Example
+
+    1. Delete the Spark resource named spark0:
         DROP RESOURCE 'spark0';
 
+
 ## keyword
-    DROP, RESOURCE
 
+    DROP, RESOURCE
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md b/docs/en/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md
index d77593e338..8ed9d60f55 100644
--- a/docs/en/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md	
+++ b/docs/en/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md	
@@ -25,17 +25,19 @@ under the License.
 -->
 
 # SHOW RESOURCES
-## description
 
-    This statement is used to display the resources that the user has permission to use. Ordinary users can only display the resources with permission, while root or admin users can display all the resources.
+## Description
+
+    This statement is used to display the resources that the user has permission to use. 
+    Ordinary users can only display the resources with permission, while root or admin users can display all the resources.
     
-    Grammar
+    Syntax:
     
         SHOW RESOURCES
         [
             WHERE 
             [NAME [ = "your_resource_name" | LIKE "name_matcher"]]
-            [RESOURCETYPE = ["SPARK"]]
+            [RESOURCETYPE = ["[spark|odbc_catalog|s3]"]]
         ]
         [ORDER BY ...]
         [LIMIT limit][OFFSET offset];
@@ -48,7 +50,8 @@ under the License.
         5) If LIMIT is specified, limit matching records are displayed. Otherwise, it is all displayed.
         6) If OFFSET is specified, the query results are displayed starting with the offset offset. The offset is 0 by default.
 
-## example
+## Example
+
     1. Display all resources that the current user has permissions on
         SHOW RESOURCES;
     
@@ -60,5 +63,5 @@ under the License.
 
 
 ## keyword
-    SHOW, RESOURCES
 
+    SHOW RESOURCES, RESOURCES
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER RESOURCE.md
similarity index 56%
copy from docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md
copy to docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER RESOURCE.md
index 07d87021fd..79cd27861f 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER RESOURCE.md	
@@ -1,6 +1,6 @@
 ---
 {
-    "title": "DROP RESOURCE",
+    "title": "ALTER RESOURCE",
     "language": "zh-CN"
 }
 ---
@@ -24,16 +24,25 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# DROP RESOURCE
-## description
-    该语句用于删除一个已有的资源。仅 root 或 admin 用户可以删除资源。
+# ALTER RESOURCE
+
+## Description
+    
+    该语句用于修改一个已有的资源。仅 root 或 admin 用户可以修改资源。
     语法:
-        DROP RESOURCE 'resource_name'
+        ALTER RESOURCE 'resource_name'
+        PROPERTIES ("key"="value", ...);
+
+    注意:resource type 不支持修改。
+
+## Example
+    
+    1. 修改名为 spark0 的 Spark 资源的工作目录:
+        ALTER RESOURCE 'spark0' PROPERTIES ("working_dir" = "hdfs://127.0.0.1:10000/tmp/doris_new");
 
-## example
-    1. 删除名为 spark0 的 Spark 资源:
-        DROP RESOURCE 'spark0';
+    2. 修改名为 remote_s3 的 S3 资源的最大连接数:
+        ALTER RESOURCE 'remote_s3' PROPERTIES ("s3_max_connections" = "100");
 
 ## keyword
-    DROP, RESOURCE
 
+    ALTER, RESOURCE
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
index 9a11ef2a73..5201cea9b8 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md	
@@ -71,6 +71,7 @@ under the License.
             1) 当前支持修改分区的下列属性:
                 - storage_medium
                 - storage_cooldown_time
+                - remote_storage_cooldown_time
                 - replication_num 
                 — in_memory
             2) 对于单分区表,partition_name 同表名。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE RESOURCE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE RESOURCE.md
index e558109214..4045fb202c 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE RESOURCE.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE RESOURCE.md	
@@ -25,21 +25,27 @@ under the License.
 -->
 
 # CREATE RESOURCE
-## description
-    该语句用于创建资源。仅 root 或 admin 用户可以创建资源。目前仅支持 Spark 外部资源。将来其他外部资源可能会加入到 Doris 中使用,如 Spark/GPU 用于查询,HDFS/S3 用于外部存储,MapReduce 用于 ETL 等。
+
+## Description
+    
+    该语句用于创建资源。仅 root 或 admin 用户可以创建资源。目前支持 Spark, ODBC, S3 外部资源。
+    将来其他外部资源可能会加入到 Doris 中使用,如 Spark/GPU 用于查询,HDFS/S3 用于外部存储,MapReduce 用于 ETL 等。
     语法:
         CREATE [EXTERNAL] RESOURCE "resource_name"
         PROPERTIES ("key"="value", ...);
             
     说明:
-        1. PROPERTIES中需要指定资源的类型 "type" = "spark",目前仅支持 spark。
+        1. PROPERTIES中需要指定资源的类型 "type" = "[spark|odbc_catalog|s3]",目前支持 spark, odbc_catalog, s3。
         2. 根据资源类型的不同 PROPERTIES 有所不同,具体见示例。
 
-## example
+## Example
+
     1. 创建yarn cluster 模式,名为 spark0 的 Spark 资源。
-      CREATE EXTERNAL RESOURCE "spark0"
-      PROPERTIES
-      (
+
+    ```        
+        CREATE EXTERNAL RESOURCE "spark0"
+        PROPERTIES
+        (
           "type" = "spark",
           "spark.master" = "yarn",
           "spark.submit.deployMode" = "cluster",
@@ -53,21 +59,74 @@ under the License.
           "broker" = "broker0",
           "broker.username" = "user0",
           "broker.password" = "password0"
-      );
-      
+        );
+    ```
                                                                                                                                                                                                               
-     Spark 相关参数如下:                                                              
-     1. spark.master: 必填,目前支持yarn,spark://host:port。                         
-     2. spark.submit.deployMode: Spark 程序的部署模式,必填,支持 cluster,client 两种。
-     3. spark.hadoop.yarn.resourcemanager.address: master为yarn时必填。               
-     4. spark.hadoop.fs.defaultFS: master为yarn时必填。                               
-     5. 其他参数为可选,参考http://spark.apache.org/docs/latest/configuration.html 
-     
-     Spark 用于 ETL 时需要指定 working_dir 和 broker。说明如下:
-     working_dir: ETL 使用的目录。spark作为ETL资源使用时必填。例如:hdfs://host:port/tmp/doris。
-     broker: broker 名字。spark作为ETL资源使用时必填。需要使用`ALTER SYSTEM ADD BROKER` 命令提前完成配置。
-     broker.property_key: broker读取ETL生成的中间文件时需要指定的认证信息等。
+    Spark 相关参数如下:                                                              
+    - spark.master: 必填,目前支持yarn,spark://host:port。                         
+    - spark.submit.deployMode: Spark 程序的部署模式,必填,支持 cluster,client 两种。
+    - spark.hadoop.yarn.resourcemanager.address: master为yarn时必填。               
+    - spark.hadoop.fs.defaultFS: master为yarn时必填。                               
+    - 其他参数为可选,参考http://spark.apache.org/docs/latest/configuration.html 
+    
+    Spark 用于 ETL 时需要指定 working_dir 和 broker。说明如下:
+        working_dir: ETL 使用的目录。spark作为ETL资源使用时必填。例如:hdfs://host:port/tmp/doris。
+        broker: broker 名字。spark作为ETL资源使用时必填。需要使用`ALTER SYSTEM ADD BROKER` 命令提前完成配置。
+        broker.property_key: broker读取ETL生成的中间文件时需要指定的认证信息等。
+
+    2. 创建 ODBC resource
+
+    ```
+        CREATE EXTERNAL RESOURCE `oracle_odbc`
+        PROPERTIES (
+        "type" = "odbc_catalog",
+        "host" = "192.168.0.1",
+        "port" = "8086",
+        "user" = "test",
+        "password" = "test",
+        "database" = "test",
+        "odbc_type" = "oracle",
+        "driver" = "Oracle 19 ODBC driver"
+        );
+    ```
+
+    ODBC 的相关参数如下:
+    - hosts:外表数据库的IP地址
+    - driver:ODBC外表的Driver名,该名字需要和be/conf/odbcinst.ini中的Driver名一致。
+    - odbc_type:外表数据库的类型,当前支持oracle, mysql, postgresql
+    - user:外表数据库的用户名
+    - password:对应用户的密码信息
+
+    3. 创建 S3 resource 
+    
+    ```
+    CREATE RESOURCE "remote_s3"
+    PROPERTIES
+    (
+    "type" = "s3",
+    "s3_endpoint" = "http://bj.s3.com",
+    "s3_region" = "bj",
+    "s3_root_path" = "/path/to/root",
+    "s3_access_key" = "bbb",
+    "s3_secret_key" = "aaaa",
+    "s3_max_connections" = "50",
+    "s3_request_timeout_ms" = "3000",
+    "s3_connection_timeout_ms" = "1000"
+    );
+    ```
+
+    S3 相关参数如下:
+    - 必需参数
+        - s3_endpoint:s3 endpoint
+        - s3_region:s3 region
+        - s3_root_path:s3 根目录
+        - s3_access_key:s3 access key
+        - s3_secret_key:s3 secret key
+    - 可选参数
+        - s3_max_connections:s3 最大连接数量,默认为 50
+        - s3_request_timeout_ms:s3 请求超时时间,单位毫秒,默认为 3000
+        - s3_connection_timeout_ms:s3 连接超时时间,单位毫秒,默认为 1000
 
 ## keyword
-    CREATE, RESOURCE
 
+    CREATE, RESOURCE
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md
index ae51ab323a..ea39f15de6 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md	
@@ -308,18 +308,24 @@ under the License.
        PROPERTIES (
            "storage_medium" = "[SSD|HDD]",
            ["storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
+           ["remote_storage_resource" = "xxx"],
+           ["remote_storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
            ["replication_num" = "3"]
            ["replication_allocation" = "xxx"]
            )
     ```
 
-       storage_medium:        用于指定该分区的初始存储介质,可选择 SSD 或 HDD。默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。
-                               注意:当FE配置项 `enable_strict_storage_medium_check` 为 `True` 时,若集群中没有设置对应的存储介质时,建表语句会报错 `Failed to find enough host in all backends with storage medium is SSD|HDD`. 
-       storage_cooldown_time: 当设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
-                               默认存放 30 天。
-                               格式为:"yyyy-MM-dd HH:mm:ss"
-       replication_num:        指定分区的副本数。默认为 3。
-       replication_allocation:     按照资源标签来指定副本分布。
+       storage_medium:                于指定该分区的初始存储介质,可选择 SSD 或 HDD。默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。
+                                       注意:当FE配置项 `enable_strict_storage_medium_check` 为 `True` 时,若集群中没有设置对应的存储介质时,建表语句会报错 `Failed to find enough host in all backends with storage medium is SSD|HDD`. 
+       storage_cooldown_time:         设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
+                                       默认存放 30 天。
+                                       格式为:"yyyy-MM-dd HH:mm:ss"
+       remote_storage_resource:        远端存储资源名称,需要与 remote_storage_cooldown_time 参数搭配使用。
+       remote_storage_cooldown_time:   与 remote_storage_resource 搭配使用。表示该分区在本地存储的到期时间。
+                                       默认不过期。如果与 storage_cooldown_time 搭配使用必须晚于该时间。
+                                       格式为:"yyyy-MM-dd HH:mm:ss"
+       replication_num:                指定分区的副本数。默认为 3。
+       replication_allocation:         按照资源标签来指定副本分布。
     
        当表为单分区表时,这些属性为表的属性。
            当表为两级分区时,这些属性为附属于每一个分区。
@@ -329,31 +335,31 @@ under the License.
            bloom filter 索引仅适用于查询条件为 in 和 equal 的情况,该列的值越分散效果越好
            目前只支持以下情况的列:除了 TINYINT FLOAT DOUBLE 类型以外的 key 列及聚合方法为 REPLACE 的 value 列
 
-```
+    ```
        PROPERTIES (
            "bloom_filter_columns"="k1,k2,k3"
            )
-```
+    ```
 
     3) 如果希望使用 Colocate Join 特性,需要在 properties 中指定
 
-```
-       PROPERTIES (
-           "colocate_with"="table1"
-           )
-```
+    ```
+        PROPERTIES (
+            "colocate_with"="table1"
+            )
+    ```
 
     4) 如果希望使用动态分区特性,需要在properties 中指定。注意:动态分区只支持 RANGE 分区
 
-```
-      PROPERTIES (
-          "dynamic_partition.enable" = "true|false",
-          "dynamic_partition.time_unit" = "HOUR|DAY|WEEK|MONTH",
-          "dynamic_partition.start" = "${integer_value}",
-          "dynamic_partition.end" = "${integer_value}",
-          "dynamic_partition.prefix" = "${string_value}",
-          "dynamic_partition.buckets" = "${integer_value}
-```
+    ```
+        PROPERTIES (
+            "dynamic_partition.enable" = "true|false",
+            "dynamic_partition.time_unit" = "HOUR|DAY|WEEK|MONTH",
+            "dynamic_partition.start" = "${integer_value}",
+            "dynamic_partition.end" = "${integer_value}",
+            "dynamic_partition.prefix" = "${string_value}",
+            "dynamic_partition.buckets" = "${integer_value}
+    ```
     dynamic_partition.enable: 用于指定表级别的动态分区功能是否开启。默认为 true。
     dynamic_partition.time_unit: 用于指定动态添加分区的时间单位,可选择为HOUR(小时),DAY(天),WEEK(周),MONTH(月)。
                                  注意:以小时为单位的分区列,数据类型不能为 DATE。
@@ -375,20 +381,20 @@ under the License.
     
     6) 如果希望使用 内存表 特性,需要在 properties 中指定
 
-```
+    ```
         PROPERTIES (
            "in_memory"="true"
         )   
-```
+    ```
     当 in_memory 属性为 true 时,Doris会尽可能将该表的数据和索引Cache到BE 内存中
     
     7) 创建UNIQUE_KEYS表时,可以指定一个sequence列,当KEY列相同时,将按照sequence列进行REPLACE(较大值替换较小值,否则无法替换)
 
-```
+    ```
         PROPERTIES (
             "function_column.sequence_type" = 'Date',
         );
-```
+    ```
     sequence_type用来指定sequence列的类型,可以为整型和时间类型
 ## example
 
@@ -428,8 +434,29 @@ under the License.
     "storage_cooldown_time" = "2015-06-04 00:00:00"
     );
    ```
+3. 创建一个 olap 表,使用 Hash 分桶,使用列存,相同key的记录进行覆盖,设置初始存储介质和冷却时间
+   设置远端存储和冷数据存储介质
+
+   ```
+    CREATE TABLE example_db.table_hash
+    (
+    k1 BIGINT,
+    k2 LARGEINT,
+    v1 VARCHAR(2048) REPLACE,
+    v2 SMALLINT SUM DEFAULT "10"
+    )
+    ENGINE=olap
+    AGGREGATE KEY(k1, k2)
+    DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
+    PROPERTIES(
+    "storage_medium" = "SSD",
+    "storage_cooldown_time" = "2015-06-04 00:00:00",
+    "remote_storage_resource" = "remote_s3",
+    "remote_storage_cooldown_time" = "2015-12-04 00:00:00"
+    );
+   ```
 
-3. 创建一个 olap 表,使用 Range 分区,使用Hash分桶,默认使用列存,
+4. 创建一个 olap 表,使用 Range 分区,使用Hash分桶,默认使用列存,
    相同key的记录同时存在,设置初始存储介质和冷却时间
 
     1)LESS THAN
@@ -492,7 +519,7 @@ under the License.
     );
     ```
 
-4. 创建一个 olap 表,使用 List 分区,使用Hash分桶,默认使用列存,
+5. 创建一个 olap 表,使用 List 分区,使用Hash分桶,默认使用列存,
    相同key的记录同时存在,设置初始存储介质和冷却时间
 
     1)单列分区
@@ -567,10 +594,10 @@ under the License.
 
     不在这些分区枚举值内的数据将视为非法数据被过滤
 
-5. 创建一个 mysql 表
+6. 创建一个 mysql 表
 
-   5.1 直接通过外表信息创建mysql表
-```
+   6.1 直接通过外表信息创建mysql表
+    ```
     CREATE EXTERNAL TABLE example_db.table_mysql
     (
     k1 DATE,
@@ -589,21 +616,19 @@ under the License.
     "database" = "mysql_db_test",
     "table" = "mysql_table_test"
     )
-```
+    ```
 
-   5.2 通过External Catalog Resource创建mysql表
-```
-   CREATE EXTERNAL RESOURCE "mysql_resource" 
-   PROPERTIES
-   (
-     "type" = "odbc_catalog",
-     "user" = "mysql_user",
-     "password" = "mysql_passwd",
-     "host" = "127.0.0.1",
-      "port" = "8239"			
-   );
-```
-```
+   6.2 通过External Catalog Resource创建mysql表
+    ```
+    CREATE EXTERNAL RESOURCE "mysql_resource" 
+    PROPERTIES
+    (
+      "type" = "odbc_catalog",
+      "user" = "mysql_user",
+      "password" = "mysql_passwd",
+      "host" = "127.0.0.1",
+       "port" = "8239"			
+    );
     CREATE EXTERNAL TABLE example_db.table_mysql
     (
     k1 DATE,
@@ -619,11 +644,11 @@ under the License.
     "database" = "mysql_db_test",
     "table" = "mysql_table_test"
     )
-```
+    ```
 
-6. 创建一个数据文件存储在HDFS上的 broker 外部表, 数据使用 "|" 分割,"\n" 换行
+7. 创建一个数据文件存储在HDFS上的 broker 外部表, 数据使用 "|" 分割,"\n" 换行
 
-```
+    ```
     CREATE EXTERNAL TABLE example_db.table_broker (
     k1 DATE,
     k2 INT,
@@ -642,11 +667,11 @@ under the License.
     "username" = "hdfs_user",
     "password" = "hdfs_password"
     )
-```
+    ```
 
-7. 创建一张含有HLL列的表
+8. 创建一张含有HLL列的表
 
-```
+    ```
     CREATE TABLE example_db.example_table
     (
     k1 TINYINT,
@@ -657,11 +682,11 @@ under the License.
     ENGINE=olap
     AGGREGATE KEY(k1, k2)
     DISTRIBUTED BY HASH(k1) BUCKETS 32;
-```
+    ```
 
-8. 创建一张含有BITMAP_UNION聚合类型的表(v1和v2列的原始数据类型必须是TINYINT,SMALLINT,INT)
+9. 创建一张含有BITMAP_UNION聚合类型的表(v1和v2列的原始数据类型必须是TINYINT,SMALLINT,INT)
 
-```
+    ```
     CREATE TABLE example_db.example_table
     (
     k1 TINYINT,
@@ -672,26 +697,26 @@ under the License.
     ENGINE=olap
     AGGREGATE KEY(k1, k2)
     DISTRIBUTED BY HASH(k1) BUCKETS 32;
-```
+    ```
 
-1. 创建一张含有QUANTILE_UNION聚合类型的表(v1和v2列的原始数据类型必须是数值类型)
+10. 创建一张含有QUANTILE_UNION聚合类型的表(v1和v2列的原始数据类型必须是数值类型)
 
-```
-    CREATE TABLE example_db.example_table
-    (
-    k1 TINYINT,
-    k2 DECIMAL(10, 2) DEFAULT "10.5",
-    v1 QUANTILE_STATE QUANTILE_UNION,
-    v2 QUANTILE_STATE QUANTILE_UNION
-    )
-    ENGINE=olap
-    AGGREGATE KEY(k1, k2)
-    DISTRIBUTED BY HASH(k1) BUCKETS 32;
-```
+     ```
+     CREATE TABLE example_db.example_table
+     (
+     k1 TINYINT,
+     k2 DECIMAL(10, 2) DEFAULT "10.5",
+     v1 QUANTILE_STATE QUANTILE_UNION,
+     v2 QUANTILE_STATE QUANTILE_UNION
+     )
+     ENGINE=olap
+     AGGREGATE KEY(k1, k2)
+     DISTRIBUTED BY HASH(k1) BUCKETS 32;
+     ```
 
-10. 创建两张支持Colocate Join的表t1 和t2
+11. 创建两张支持Colocate Join的表t1 和t2
 
-```
+    ```
     CREATE TABLE `t1` (
     `id` int(11) COMMENT "",
     `value` varchar(8) COMMENT ""
@@ -711,11 +736,11 @@ under the License.
     PROPERTIES (
     "colocate_with" = "t1"
     );
-```
+    ```
 
-11. 创建一个数据文件存储在BOS上的 broker 外部表
+12. 创建一个数据文件存储在BOS上的 broker 外部表
 
-```
+    ```
     CREATE EXTERNAL TABLE example_db.table_broker (
     k1 DATE
     )
@@ -729,11 +754,11 @@ under the License.
       "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
       "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
     )
-```
+    ```
 
-12. 创建一个带有bitmap 索引的表
+13. 创建一个带有bitmap 索引的表
 
-```
+    ```
     CREATE TABLE example_db.table_hash
     (
     k1 TINYINT,
@@ -746,18 +771,18 @@ under the License.
     AGGREGATE KEY(k1, k2)
     COMMENT "my first doris table"
     DISTRIBUTED BY HASH(k1) BUCKETS 32;
-```
+    ```
 
-13. 创建一个动态分区表(需要在FE配置中开启动态分区功能),该表每天提前创建3天的分区,并删除3天前的分区。例如今天为`2020-01-08`,则会创建分区名为`p20200108`, `p20200109`, `p20200110`, `p20200111`的分区. 分区范围分别为: 
+14. 创建一个动态分区表(需要在FE配置中开启动态分区功能),该表每天提前创建3天的分区,并删除3天前的分区。例如今天为`2020-01-08`,则会创建分区名为`p20200108`, `p20200109`, `p20200110`, `p20200111`的分区. 分区范围分别为: 
 
-```
-[types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; )
-[types: [DATE]; keys: [2020-01-09]; ‥types: [DATE]; keys: [2020-01-10]; )
-[types: [DATE]; keys: [2020-01-10]; ‥types: [DATE]; keys: [2020-01-11]; )
-[types: [DATE]; keys: [2020-01-11]; ‥types: [DATE]; keys: [2020-01-12]; )
-```
+    ```
+    [types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; )
+    [types: [DATE]; keys: [2020-01-09]; ‥types: [DATE]; keys: [2020-01-10]; )
+    [types: [DATE]; keys: [2020-01-10]; ‥types: [DATE]; keys: [2020-01-11]; )
+    [types: [DATE]; keys: [2020-01-11]; ‥types: [DATE]; keys: [2020-01-12]; )
+    ```
 
-```
+    ```
     CREATE TABLE example_db.dynamic_partition
     (
     k1 DATE,
@@ -778,10 +803,10 @@ under the License.
     "dynamic_partition.prefix" = "p",
     "dynamic_partition.buckets" = "32"
      );
-```
+    ```
 
-14. 创建一个带有rollup索引的表
-```
+15. 创建一个带有rollup索引的表
+    ```
     CREATE TABLE example_db.rollup_index_table
     (
         event_day DATE,
@@ -798,10 +823,10 @@ under the License.
     r3(event_day)
     )
     PROPERTIES("replication_num" = "3");
-```
-15. 创建一个内存表
+    ```
+16. 创建一个内存表
 
-```
+    ```
     CREATE TABLE example_db.table_hash
     (
     k1 TINYINT,
@@ -815,11 +840,11 @@ under the License.
     COMMENT "my first doris table"
     DISTRIBUTED BY HASH(k1) BUCKETS 32
     PROPERTIES ("in_memory"="true");
-```
+    ```
 
-16. 创建一个hive外部表
+17. 创建一个hive外部表
 
-```
+    ```
     CREATE TABLE example_db.table_hive
     (
       k1 TINYINT,
@@ -833,11 +858,11 @@ under the License.
       "table" = "hive_table_name",
       "hive.metastore.uris" = "thrift://127.0.0.1:9083"
     );
-```
+    ```
 
-17. 通过 replication_allocation 指定表的副本分布
+18. 通过 replication_allocation 指定表的副本分布
 
-```	
+    ```	
     CREATE TABLE example_db.table_hash
     (
     k1 TINYINT,
@@ -845,8 +870,8 @@ under the License.
     )
     DISTRIBUTED BY HASH(k1) BUCKETS 32
     PROPERTIES (
-		"replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
-	);
+        "replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
+    );
 
 
     CREATE TABLE example_db.dynamic_partition
@@ -867,11 +892,11 @@ under the License.
     "dynamic_partition.buckets" = "32",
     "dynamic_partition."replication_allocation" = "tag.location.group_a:3"
      );
-```
+    ```
 
-17. 创建一个 Iceberg 外表
+19. 创建一个 Iceberg 外表
 
-```
+    ```
     CREATE TABLE example_db.t_iceberg 
     ENGINE=ICEBERG
     PROPERTIES (
@@ -880,7 +905,7 @@ under the License.
     "iceberg.hive.metastore.uris"  =  "thrift://127.0.0.1:9083",
     "iceberg.catalog.type"  =  "HIVE_CATALOG"
     );
-```
+    ```
 
 ## keyword
 
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md
index 07d87021fd..2f55b17bf6 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/DROP RESOURCE.md	
@@ -25,15 +25,20 @@ under the License.
 -->
 
 # DROP RESOURCE
-## description
+
+## Description
+    
     该语句用于删除一个已有的资源。仅 root 或 admin 用户可以删除资源。
     语法:
         DROP RESOURCE 'resource_name'
 
-## example
+    注意:正在使用的 ODBC/S3 资源无法删除。
+
+## Example
+    
     1. 删除名为 spark0 的 Spark 资源:
         DROP RESOURCE 'spark0';
 
 ## keyword
-    DROP, RESOURCE
 
+    DROP, RESOURCE
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md
index 12f118ae7f..97981e6424 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/SHOW RESOURCES.md	
@@ -25,30 +25,32 @@ under the License.
 -->
 
 # SHOW RESOURCES
-## description
+
+## Description
 
     该语句用于展示用户有使用权限的资源。普通用户仅能展示有使用权限的资源,root 或 admin 用户会展示所有的资源。
     
-    语法
+    语法:
     
         SHOW RESOURCES
         [
             WHERE 
             [NAME [ = "your_resource_name" | LIKE "name_matcher"]]
-            [RESOURCETYPE = ["SPARK"]]
+            [RESOURCETYPE = ["[spark|odbc_catalog|s3]"]]
         ]
         [ORDER BY ...]
         [LIMIT limit][OFFSET offset];
         
      说明:
-        1) 如果使用 NAME LIKE,则会匹配RESOURCES的Name包含 name_matcher的Resource
+        1) 如果使用 NAME LIKE,则会匹配 RESOURCES 的 Name 包含 name_matcher 的 Resource
         2) 如果使用 NAME = ,则精确匹配指定的 Name
-        3) 如果指定了RESOURCETYPE,则匹配对应的Resrouce类型
+        3) 如果指定了 RESOURCETYPE,则匹配对应的 Resrouce 类型
         4) 可以使用 ORDER BY 对任意列组合进行排序
         5) 如果指定了 LIMIT,则显示 limit 条匹配记录。否则全部显示
-        6) 如果指定了 OFFSET,则从偏移量offset开始显示查询结果。默认情况下偏移量为0。
+        6) 如果指定了 OFFSET,则从偏移量 offset 开始显示查询结果。默认情况下偏移量为 0。
+
+## Example
 
-## example
     1. 展示当前用户拥有权限的所有Resource
         SHOW RESOURCES;
     
@@ -60,5 +62,5 @@ under the License.
 
 
 ## keyword
-    SHOW RESOURCES
 
+    SHOW RESOURCES, RESOURCES
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index e5288a2c47..840365f92d 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -34,9 +34,11 @@ public final class FeMetaVersion {
     public static final int VERSION_106 = 106;
     // support stream load 2PC
     public static final int VERSION_107 = 107;
+    // add storage_cold_medium and remote_storage_resource_name in DataProperty
+    public static final int VERSION_108 = 108;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_107;
-    
+    public static final int VERSION_CURRENT = VERSION_108;
+
     // all logs meta version should >= the minimum version, so that we could remove many if clause, for example
     // if (FE_METAVERSION < VERSION_94) ... 
     // these clause will be useless and we could remove them 
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 2a61e5c641..ea5d5cae3f 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -858,6 +858,10 @@ alter_stmt ::=
     {:
         RESULT = new AlterDatabasePropertyStmt(dbName, map);
     :}
+    | KW_ALTER KW_RESOURCE ident_or_text:resourceName opt_properties:properties
+    {:
+        RESULT = new AlterResourceStmt(resourceName, properties);
+    :}
     | KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties
       opt_datasource_properties:datasourceProperties
     {:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 2667a0c12e..2f8a9d7cbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.AlterViewStmt;
 import org.apache.doris.analysis.ColumnRenameClause;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.DateLiteral;
 import org.apache.doris.analysis.DropMaterializedViewStmt;
 import org.apache.doris.analysis.DropPartitionClause;
 import org.apache.doris.analysis.ModifyColumnCommentClause;
@@ -51,6 +52,7 @@ import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Table.TableType;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.View;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
@@ -59,6 +61,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.AlterViewInfo;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.ModifyCommentOperationLog;
@@ -79,6 +82,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -654,15 +658,13 @@ public class Alter {
         }
 
         // get value from properties here
-        // 1. data property
-        DataProperty newDataProperty = PropertyAnalyzer.analyzeDataProperty(properties, null);
-        // 2. replica allocation
+        // 1. replica allocation
         ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
         Catalog.getCurrentSystemInfo().checkReplicaAllocation(db.getClusterName(), replicaAlloc);
-        // 3. in memory
+        // 2. in memory
         boolean newInMemory = PropertyAnalyzer.analyzeBooleanProp(properties,
                 PropertyAnalyzer.PROPERTIES_INMEMORY, false);
-        // 4. tablet type
+        // 3. tablet type
         TTabletType tTabletType =
                 PropertyAnalyzer.analyzeTabletType(properties);
 
@@ -670,6 +672,23 @@ public class Alter {
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
         for (String partitionName : partitionNames) {
             Partition partition = olapTable.getPartition(partitionName);
+            // 4. data property
+            // 4.1 get old data property from partition
+            DataProperty dataProperty = partitionInfo.getDataProperty(partition.getId());
+            // 4.2 combine the old properties with new ones
+            Map<String, String> newProperties = new HashMap<>();
+            newProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, dataProperty.getStorageMedium().name());
+            DateLiteral dateLiteral = new DateLiteral(dataProperty.getCooldownTimeMs(),
+                    TimeUtils.getTimeZone(), Type.DATETIME);
+            newProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, dateLiteral.getStringValue());
+            newProperties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE, dataProperty.getRemoteStorageResourceName());
+            DateLiteral dateLiteral1 = new DateLiteral(dataProperty.getRemoteCooldownTimeMs(),
+                    TimeUtils.getTimeZone(), Type.DATETIME);
+            newProperties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME, dateLiteral1.getStringValue());
+            newProperties.putAll(properties);
+            // 4.3 analyze new properties
+            DataProperty newDataProperty = PropertyAnalyzer.analyzeDataProperty(newProperties, null);
+
             // 1. date property
             if (newDataProperty != null) {
                 partitionInfo.setDataProperty(partition.getId(), newDataProperty);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterResourceStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterResourceStmt.java
new file mode 100644
index 0000000000..315fc01223
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterResourceStmt.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Resource;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Map;
+
+public class AlterResourceStmt extends DdlStmt {
+    private static final String TYPE = "type";
+
+    private final String resourceName;
+    private final Map<String, String> properties;
+
+    public AlterResourceStmt(String resourceName, Map<String, String> properties) {
+        this.resourceName = resourceName;
+        this.properties = properties;
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+
+        // check auth
+        if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
+        }
+
+        if (properties == null || properties.isEmpty()) {
+            throw new AnalysisException("Resource properties can't be null");
+        }
+
+        // check type in properties
+        if (properties.containsKey(TYPE)) {
+            throw new AnalysisException("Can not change resource type.");
+        }
+
+        // check resource existence
+        Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
+        if (resource == null) {
+            throw new AnalysisException("Unknown resource: " + resourceName);
+        }
+        // check properties
+        resource.checkProperties(properties);
+    }
+
+    @Override
+    public String toSql() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("ALTER RESOURCE '").append(resourceName).append("' ");
+        sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
+        return sb.toString();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java
index 504f197ab6..78ee203469 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java
@@ -18,13 +18,11 @@
 package org.apache.doris.analysis;
 
 import org.apache.doris.alter.AlterOpType;
-import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.PropertyAnalyzer;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -98,10 +96,10 @@ public class ModifyPartitionClause extends AlterTableClause {
     // 3. in_memory
     // 4. tablet type
     private void checkProperties(Map<String, String> properties) throws AnalysisException {
-        // 1. data property
-        DataProperty newDataProperty = null;
-        newDataProperty = PropertyAnalyzer.analyzeDataProperty(properties, DataProperty.DEFAULT_DATA_PROPERTY);
-        Preconditions.checkNotNull(newDataProperty);
+        // 1. data property, can not modify partition property remote_storage_resource
+        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
+            throw new AnalysisException("Do not support modify partition data property `remote_storage_resource`.");
+        }
 
         // 2. replica allocation
         PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index da12a87366..ab69aaa690 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -88,6 +88,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
             this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
         } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TABLET_TYPE)) {
             throw new AnalysisException("Alter tablet type not supported");
+        } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
+            throw new AnalysisException("Alter table remote_storage_resource is not supported.");
         } else {
             throw new AnalysisException("Unknown table property: " + properties.keySet());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 3178f05e34..a670dd2aeb 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3722,6 +3722,10 @@ public class Catalog {
         boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false);
         olapTable.setIsInMemory(isInMemory);
 
+        // set remote storage
+        String resourceName = PropertyAnalyzer.analyzeRemoteStorageResource(properties);
+        olapTable.setRemoteStorageResource(resourceName);
+
         TTabletType tabletType;
         try {
             tabletType = PropertyAnalyzer.analyzeTabletType(properties);
@@ -4246,6 +4250,13 @@ public class Catalog {
             sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\" = \"");
             sb.append(olapTable.getStorageFormat()).append("\"");
 
+            // remote storage resource
+            String remoteStorageResource = olapTable.getRemoteStorageResource();
+            if (!Strings.isNullOrEmpty(remoteStorageResource)) {
+                sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE).append("\" = \"");
+                sb.append(remoteStorageResource).append("\"");
+            }
+
             sb.append("\n)");
         } else if (table.getType() == TableType.MYSQL) {
             MysqlTable mysqlTable = (MysqlTable) table;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
index 8d4c22c9e6..250cbbb332 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
@@ -18,10 +18,13 @@
 package org.apache.doris.catalog;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TStorageMedium;
+
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
@@ -30,13 +33,18 @@ import java.io.IOException;
 
 public class DataProperty implements Writable {
     public static final DataProperty DEFAULT_DATA_PROPERTY = new DataProperty(
-            "SSD".equalsIgnoreCase(Config.default_storage_medium) ? TStorageMedium.SSD : TStorageMedium.HDD);
+            "SSD".equalsIgnoreCase(Config.default_storage_medium) ? TStorageMedium.SSD : TStorageMedium.HDD
+    );
     public static final long MAX_COOLDOWN_TIME_MS = 253402271999000L; // 9999-12-31 23:59:59
 
     @SerializedName(value =  "storageMedium")
     private TStorageMedium storageMedium;
     @SerializedName(value =  "cooldownTimeMs")
     private long cooldownTimeMs;
+    @SerializedName(value = "remoteStorageResourceName")
+    private String remoteStorageResourceName;
+    @SerializedName(value = "remoteCooldownTimeMs")
+    private long remoteCooldownTimeMs;
 
     private DataProperty() {
         // for persist
@@ -50,11 +58,16 @@ public class DataProperty implements Writable {
         } else {
             this.cooldownTimeMs = MAX_COOLDOWN_TIME_MS;
         }
+        this.remoteStorageResourceName = "";
+        this.remoteCooldownTimeMs = MAX_COOLDOWN_TIME_MS;
     }
 
-    public DataProperty(TStorageMedium medium, long cooldown) {
+    public DataProperty(TStorageMedium medium, long cooldown,
+                        String remoteStorageResourceName, long remoteCooldownTimeMs) {
         this.storageMedium = medium;
         this.cooldownTimeMs = cooldown;
+        this.remoteStorageResourceName = remoteStorageResourceName;
+        this.remoteCooldownTimeMs = remoteCooldownTimeMs;
     }
 
     public TStorageMedium getStorageMedium() {
@@ -65,7 +78,19 @@ public class DataProperty implements Writable {
         return cooldownTimeMs;
     }
 
+    public long getRemoteCooldownTimeMs() {
+        return remoteCooldownTimeMs;
+    }
+
+    public String getRemoteStorageResourceName() {
+        return remoteStorageResourceName;
+    }
+
     public static DataProperty read(DataInput in) throws IOException {
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_108) {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, DataProperty.class);
+        }
         DataProperty dataProperty = new DataProperty();
         dataProperty.readFields(in);
         return dataProperty;
@@ -73,13 +98,15 @@ public class DataProperty implements Writable {
 
     @Override
     public void write(DataOutput out) throws IOException {
-        Text.writeString(out, storageMedium.name());
-        out.writeLong(cooldownTimeMs);
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
     }
 
     public void readFields(DataInput in) throws IOException {
         storageMedium = TStorageMedium.valueOf(Text.readString(in));
         cooldownTimeMs = in.readLong();
+        remoteStorageResourceName = "";
+        remoteCooldownTimeMs = MAX_COOLDOWN_TIME_MS;
     }
 
     @Override
@@ -95,14 +122,18 @@ public class DataProperty implements Writable {
         DataProperty other = (DataProperty) obj;
 
         return this.storageMedium == other.storageMedium
-                && this.cooldownTimeMs == other.cooldownTimeMs;
+                && this.cooldownTimeMs == other.cooldownTimeMs
+                && this.remoteCooldownTimeMs == other.remoteCooldownTimeMs
+                && this.remoteStorageResourceName.equals(other.remoteStorageResourceName);
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("Storage medium[").append(this.storageMedium).append("]. ");
-        sb.append("cool down[").append(TimeUtils.longToTimeString(cooldownTimeMs)).append("].");
+        sb.append("cool down[").append(TimeUtils.longToTimeString(cooldownTimeMs)).append("]. ");
+        sb.append("remote storage resource name[").append(this.remoteStorageResourceName).append("]. ");
+        sb.append("remote cool down[").append(TimeUtils.longToTimeString(remoteCooldownTimeMs)).append("].");
         return sb.toString();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java
index 9020cd8206..d03d4e88a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.proc.BaseProcResult;
 
@@ -93,6 +94,33 @@ public class OdbcCatalogResource extends Resource {
 
     }
 
+    @Override
+    public void modifyProperties(Map<String, String> properties) throws DdlException {
+        // modify properties
+        replaceIfEffectiveValue(this.configs, HOST, properties.get(HOST));
+        replaceIfEffectiveValue(this.configs, PORT, properties.get(PORT));
+        replaceIfEffectiveValue(this.configs, USER, properties.get(USER));
+        replaceIfEffectiveValue(this.configs, PASSWORD, properties.get(PASSWORD));
+        replaceIfEffectiveValue(this.configs, TYPE, properties.get(TYPE));
+        replaceIfEffectiveValue(this.configs, DRIVER, properties.get(DRIVER));
+    }
+
+    @Override
+    public void checkProperties(Map<String, String> properties) throws AnalysisException {
+        Map<String, String> copiedProperties = Maps.newHashMap(properties);
+        // check properties
+        copiedProperties.remove(HOST);
+        copiedProperties.remove(PORT);
+        copiedProperties.remove(USER);
+        copiedProperties.remove(PASSWORD);
+        copiedProperties.remove(TYPE);
+        copiedProperties.remove(DRIVER);
+
+        if (!copiedProperties.isEmpty()) {
+            throw new AnalysisException("Unknown ODBC catalog resource properties: " + copiedProperties);
+        }
+    }
+
     public String getProperties(String propertiesKey)  {
         // check the properties key
         String value = configs.get(propertiesKey);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index d23a4609a0..0038095af1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1536,6 +1536,14 @@ public class OlapTable extends Table {
         tableProperty.buildDataSortInfo();
     }
 
+    public void setRemoteStorageResource(String resourceName) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        tableProperty.setRemoteStorageResource(resourceName);
+        tableProperty.buildRemoteStorageResource();
+    }
+
     // return true if partition with given name already exist, both in partitions and temp partitions.
     // return false otherwise
     public boolean checkPartitionNameExist(String partitionName) {
@@ -1688,6 +1696,13 @@ public class OlapTable extends Table {
         return tableProperty.getDataSortInfo();
     }
 
+    public String getRemoteStorageResource() {
+        if (tableProperty == null) {
+            return "";
+        }
+        return tableProperty.getRemoteStorageResource();
+    }
+
     // For non partitioned table:
     //   The table's distribute hash columns need to be a subset of the aggregate columns.
     //
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index 96a1ec6a6d..e62a6fa3b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.catalog;
 
+import com.google.common.base.Strings;
 import org.apache.doris.analysis.CreateResourceStmt;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.DeepCopy;
@@ -42,7 +44,8 @@ public abstract class Resource implements Writable {
     public enum ResourceType {
         UNKNOWN,
         SPARK,
-        ODBC_CATALOG;
+        ODBC_CATALOG,
+        S3;
 
         public static ResourceType fromString(String resourceType) {
             for (ResourceType type : ResourceType.values()) {
@@ -68,20 +71,35 @@ public abstract class Resource implements Writable {
     }
 
     public static Resource fromStmt(CreateResourceStmt stmt) throws DdlException {
-        Resource resource = null;
-        ResourceType type = stmt.getResourceType();
+        Resource resource = getResourceInstance(stmt.getResourceType(), stmt.getResourceName());
+        resource.setProperties(stmt.getProperties());
+
+        return resource;
+    }
+
+    /**
+     * Get resource instance by resource name and type
+     * @param type
+     * @param name
+     * @return
+     * @throws DdlException
+     */
+    private static Resource getResourceInstance(ResourceType type, String name) throws DdlException {
+        Resource resource;
         switch (type) {
             case SPARK:
-                resource = new SparkResource(stmt.getResourceName());
+                resource = new SparkResource(name);
                 break;
             case ODBC_CATALOG:
-                resource = new OdbcCatalogResource(stmt.getResourceName());
+                resource = new OdbcCatalogResource(name);
+                break;
+            case S3:
+                resource = new S3Resource(name);
                 break;
             default:
-                throw new DdlException("Only support Spark resource.");
+                throw new DdlException("Unknown resource type: " + type);
         }
 
-        resource.setProperties(stmt.getProperties());
         return resource;
     }
 
@@ -93,6 +111,26 @@ public abstract class Resource implements Writable {
         return type;
     }
 
+    /**
+     * Modify properties in child resources
+     * @param properties
+     * @throws DdlException
+     */
+    public abstract void modifyProperties(Map<String, String> properties) throws DdlException;
+
+    /**
+     * Check properties in child resources
+     * @param properties
+     * @throws AnalysisException
+     */
+    public abstract void checkProperties(Map<String, String> properties) throws AnalysisException;
+
+    protected void replaceIfEffectiveValue(Map<String, String> properties, String key, String value) {
+        if (!Strings.isNullOrEmpty(value)) {
+            properties.put(key, value);
+        }
+    }
+
     /**
      * Set and check the properties in child resources
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index 7714722cef..9908e5ef52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.analysis.AlterResourceStmt;
 import org.apache.doris.analysis.CreateResourceStmt;
 import org.apache.doris.analysis.DropResourceStmt;
 import org.apache.doris.catalog.Resource.ResourceType;
@@ -42,8 +43,10 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import com.google.gson.annotations.SerializedName;
@@ -69,14 +72,16 @@ public class ResourceMgr implements Writable {
     }
 
     public void createResource(CreateResourceStmt stmt) throws DdlException {
-        if (stmt.getResourceType() != ResourceType.SPARK && stmt.getResourceType() != ResourceType.ODBC_CATALOG) {
-            throw new DdlException("Only support SPARK and ODBC_CATALOG resource.");
+        if (stmt.getResourceType() != ResourceType.SPARK
+                && stmt.getResourceType() != ResourceType.ODBC_CATALOG
+                && stmt.getResourceType() != ResourceType.S3) {
+            throw new DdlException("Only support SPARK, ODBC_CATALOG and REMOTE_STORAGE resource.");
         }
         Resource resource = Resource.fromStmt(stmt);
         createResource(resource);
         // log add
         Catalog.getCurrentCatalog().getEditLog().logCreateResource(resource);
-        LOG.info("create resource success. resource: {}", resource);
+        LOG.info("Create resource success. Resource: {}", resource);
     }
 
     public void createResource(Resource resource) throws DdlException {
@@ -91,14 +96,46 @@ public class ResourceMgr implements Writable {
     }
 
     public void dropResource(DropResourceStmt stmt) throws DdlException {
-        String name = stmt.getResourceName();
-        if (nameToResource.remove(name) == null) {
-            throw new DdlException("Resource(" + name + ") does not exist");
+        String resourceName = stmt.getResourceName();
+        if (!nameToResource.containsKey(resourceName)) {
+            throw new DdlException("Resource(" + resourceName + ") does not exist");
         }
-
+        // Check whether the resource is in use before deleting it, except spark resource
+        List<String> usedTables = new ArrayList<>();
+        List<Long> dbIds = Catalog.getCurrentCatalog().getDbIds();
+        for (Long dbId : dbIds) {
+            Optional<Database> database = Catalog.getCurrentCatalog().getDb(dbId);
+            database.ifPresent(db -> {
+                List<Table> tables = db.getTablesOnIdOrder();
+                for (Table table : tables) {
+                    if (table instanceof OdbcTable) {
+                        // odbc resource
+                        if (resourceName.equals(((OdbcTable) table).getOdbcCatalogResourceName())) {
+                            usedTables.add(db.getFullName() + "." + table.getName());
+                        }
+                    } else if (table instanceof OlapTable) {
+                        // remote resource, such as s3 resource
+                        PartitionInfo partitionInfo = ((OlapTable) table).getPartitionInfo();
+                        List<Long> partitionIds = ((OlapTable) table).getPartitionIds();
+                        for (Long partitionId : partitionIds) {
+                            DataProperty dataProperty = partitionInfo.getDataProperty(partitionId);
+                            if (resourceName.equals(dataProperty.getRemoteStorageResourceName())) {
+                                usedTables.add(db.getFullName() + "." + table.getName());
+                                break;
+                            }
+                        }
+                    }
+                }
+            });
+        }
+        if (usedTables.size() > 0) {
+            LOG.warn("Can not drop resource, since it's used in tables {}", usedTables);
+            throw new DdlException("Can not drop resource, since it's used in tables " + usedTables);
+        }
+        nameToResource.remove(resourceName);
         // log drop
-        Catalog.getCurrentCatalog().getEditLog().logDropResource(new DropResourceOperationLog(name));
-        LOG.info("drop resource success. resource name: {}", name);
+        Catalog.getCurrentCatalog().getEditLog().logDropResource(new DropResourceOperationLog(resourceName));
+        LOG.info("Drop resource success. Resource resourceName: {}", resourceName);
     }
 
     // Drop resource whether successful or not
@@ -114,6 +151,26 @@ public class ResourceMgr implements Writable {
         nameToResource.remove(operationLog.getName());
     }
 
+    public void alterResource(AlterResourceStmt stmt) throws DdlException {
+        String resourceName = stmt.getResourceName();
+        Map<String, String> properties = stmt.getProperties();
+
+        if (!nameToResource.containsKey(resourceName)) {
+            throw new DdlException("Resource(" + resourceName + ") dose not exist.");
+        }
+
+        Resource resource = nameToResource.get(resourceName);
+        resource.modifyProperties(properties);
+
+        // log alter
+        Catalog.getCurrentCatalog().getEditLog().logAlterResource(resource);
+        LOG.info("Alter resource success. Resource: {}", resource);
+    }
+
+    public void replayAlterResource(Resource resource) {
+        nameToResource.put(resource.getName(), resource);
+    }
+
     public boolean containsResource(String name) {
         return nameToResource.containsKey(name);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
new file mode 100644
index 0000000000..0b6eda828f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.proc.BaseProcResult;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Map;
+
+/**
+ * S3 resource for olap table
+ *
+ * Syntax:
+ * CREATE RESOURCE "remote_s3"
+ * PROPERTIES
+ * (
+ *    "type" = "s3",
+ *    "s3_endpoint" = "bj",
+ *    "s3_region" = "bj",
+ *    "s3_root_path" = "/path/to/root",
+ *    "s3_access_key" = "bbb",
+ *    "s3_secret_key" = "aaaa",
+ *    "s3_max_connections" = "50",
+ *    "s3_request_timeout_ms" = "3000",
+ *    "s3_connection_timeout_ms" = "1000"
+ * );
+ */
+public class S3Resource extends Resource {
+    // required
+    private static final String S3_ENDPOINT = "s3_endpoint";
+    private static final String S3_REGION = "s3_region";
+    private static final String S3_ROOT_PATH = "s3_root_path";
+    private static final String S3_ACCESS_KEY = "s3_access_key";
+    private static final String S3_SECRET_KEY = "s3_secret_key";
+
+    // optional
+    private static final String S3_MAX_CONNECTIONS = "s3_max_connections";
+    private static final String S3_REQUEST_TIMEOUT_MS = "s3_request_timeout_ms";
+    private static final String S3_CONNECTION_TIMEOUT_MS = "s3_connection_timeout_ms";
+    private static final String DEFAULT_S3_MAX_CONNECTIONS = "50";
+    private static final String DEFAULT_S3_REQUEST_TIMEOUT_MS = "3000";
+    private static final String DEFAULT_S3_CONNECTION_TIMEOUT_MS = "1000";
+
+    @SerializedName(value = "properties")
+    private Map<String, String> properties;
+
+    public S3Resource(String name) {
+        this(name, Maps.newHashMap());
+    }
+
+    public S3Resource(String name, Map<String, String> properties) {
+        super(name, ResourceType.S3);
+        this.properties = properties;
+    }
+
+    public String getProperty(String propertyKey) {
+        return properties.get(propertyKey);
+    }
+
+    @Override
+    protected void setProperties(Map<String, String> properties) throws DdlException {
+        Preconditions.checkState(properties != null);
+        this.properties = properties;
+        // check properties
+        // required
+        checkRequiredProperty(S3_ENDPOINT);
+        checkRequiredProperty(S3_REGION);
+        checkRequiredProperty(S3_ROOT_PATH);
+        checkRequiredProperty(S3_ACCESS_KEY);
+        checkRequiredProperty(S3_SECRET_KEY);
+        // optional
+        checkOptionalProperty(S3_MAX_CONNECTIONS, DEFAULT_S3_MAX_CONNECTIONS);
+        checkOptionalProperty(S3_REQUEST_TIMEOUT_MS, DEFAULT_S3_REQUEST_TIMEOUT_MS);
+        checkOptionalProperty(S3_CONNECTION_TIMEOUT_MS, DEFAULT_S3_CONNECTION_TIMEOUT_MS);
+    }
+
+    private void checkRequiredProperty(String propertyKey) throws DdlException {
+        String value = properties.get(propertyKey);
+
+        if (Strings.isNullOrEmpty(value)) {
+            throw new DdlException("Missing [" + propertyKey + "] in properties.");
+        }
+    }
+
+    private void checkOptionalProperty(String propertyKey, String defaultValue) {
+        this.properties.putIfAbsent(propertyKey, defaultValue);
+    }
+
+    @Override
+    public void modifyProperties(Map<String, String> properties) throws DdlException {
+        // modify properties
+        replaceIfEffectiveValue(this.properties, S3_ENDPOINT, properties.get(S3_ENDPOINT));
+        replaceIfEffectiveValue(this.properties, S3_REGION, properties.get(S3_REGION));
+        replaceIfEffectiveValue(this.properties, S3_ROOT_PATH, properties.get(S3_ROOT_PATH));
+        replaceIfEffectiveValue(this.properties, S3_ACCESS_KEY, properties.get(S3_ACCESS_KEY));
+        replaceIfEffectiveValue(this.properties, S3_SECRET_KEY, properties.get(S3_SECRET_KEY));
+        replaceIfEffectiveValue(this.properties, S3_MAX_CONNECTIONS, properties.get(S3_MAX_CONNECTIONS));
+        replaceIfEffectiveValue(this.properties, S3_REQUEST_TIMEOUT_MS, properties.get(S3_REQUEST_TIMEOUT_MS));
+        replaceIfEffectiveValue(this.properties, S3_CONNECTION_TIMEOUT_MS, properties.get(S3_CONNECTION_TIMEOUT_MS));
+    }
+
+    @Override
+    public void checkProperties(Map<String, String> properties) throws AnalysisException {
+        // check properties
+        Map<String, String> copiedProperties = Maps.newHashMap(properties);
+        copiedProperties.remove(S3_ENDPOINT);
+        copiedProperties.remove(S3_REGION);
+        copiedProperties.remove(S3_ROOT_PATH);
+        copiedProperties.remove(S3_ACCESS_KEY);
+        copiedProperties.remove(S3_SECRET_KEY);
+        copiedProperties.remove(S3_MAX_CONNECTIONS);
+        copiedProperties.remove(S3_REQUEST_TIMEOUT_MS);
+        copiedProperties.remove(S3_CONNECTION_TIMEOUT_MS);
+
+        if (!copiedProperties.isEmpty()) {
+            throw new AnalysisException("Unknown S3 resource properties: " + copiedProperties);
+        }
+    }
+
+    @Override
+    protected void getProcNodeData(BaseProcResult result) {
+        String lowerCaseType = type.name().toLowerCase();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            // it's dangerous to show password in show odbc resource,
+            // so we use empty string to replace the real password
+            if (entry.getKey().equals(S3_ACCESS_KEY)) {
+                result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), ""));
+            } else {
+                result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
+            }
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index 0bda48bf47..74144f8662 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
@@ -206,6 +207,11 @@ public class SparkResource extends Resource {
             return;
         }
 
+        // update properties
+        updateProperties(properties);
+    }
+
+    private void updateProperties(Map<String, String> properties) throws DdlException {
         // update spark configs
         if (properties.containsKey(SPARK_MASTER)) {
             throw new DdlException("Cannot change spark master");
@@ -291,6 +297,24 @@ public class SparkResource extends Resource {
         return brokerProperties;
     }
 
+    @Override
+    public void modifyProperties(Map<String, String> properties) throws DdlException {
+        updateProperties(properties);
+    }
+
+    @Override
+    public void checkProperties(Map<String, String> properties) throws AnalysisException {
+        Map<String, String> copiedProperties = Maps.newHashMap(properties);
+        copiedProperties.keySet().removeAll(getSparkConfig(properties).keySet());
+        copiedProperties.keySet().removeAll(getBrokerProperties(properties).keySet());
+        copiedProperties.remove(BROKER);
+        copiedProperties.remove(WORKING_DIR);
+
+        if (!copiedProperties.isEmpty()) {
+            throw new AnalysisException("Unknown spark resource properties: " + copiedProperties);
+        }
+    }
+
     @Override
     protected void getProcNodeData(BaseProcResult result) {
         String lowerCaseType = type.name().toLowerCase();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 7c0d6f277e..c3e5317c51 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -72,6 +72,9 @@ public class TableProperty implements Writable {
 
     private DataSortInfo dataSortInfo = new DataSortInfo();
 
+    // remote storage resource, for cold data
+    private String remoteStorageResource;
+
     public TableProperty(Map<String, String> properties) {
         this.properties = properties;
     }
@@ -159,6 +162,11 @@ public class TableProperty implements Writable {
         return this;
     }
 
+    public TableProperty buildRemoteStorageResource() {
+        remoteStorageResource = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE, "");
+        return this;
+    }
+
     public void modifyTableProperties(Map<String, String> modifyProperties) {
         properties.putAll(modifyProperties);
         removeDuplicateReplicaNumProperty();
@@ -176,6 +184,12 @@ public class TableProperty implements Writable {
                 replicaAlloc.toCreateStmt());
     }
 
+    public void setRemoteStorageResource(String resourceName) {
+        this.remoteStorageResource = resourceName;
+        properties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE,
+                resourceName);
+    }
+
     public ReplicaAllocation getReplicaAllocation() {
         return replicaAlloc;
     }
@@ -214,6 +228,10 @@ public class TableProperty implements Writable {
         return dataSortInfo;
     }
 
+    public String getRemoteStorageResource() {
+        return remoteStorageResource;
+    }
+
     public void buildReplicaAllocation() {
         try {
             // Must copy the properties because "analyzeReplicaAllocation" with remove the property
@@ -237,7 +255,8 @@ public class TableProperty implements Writable {
                 .executeBuildDynamicProperty()
                 .buildInMemory()
                 .buildStorageFormat()
-                .buildDataSortInfo();
+                .buildDataSortInfo()
+                .buildRemoteStorageResource();
         if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) {
             // get replica num from property map and create replica allocation
             String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index e85b620d01..361e429c69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -242,7 +242,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
         partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.SSD.name());
         String cooldownTime = DynamicPartitionUtil.getPartitionRangeString(property, now, offset + hotPartitionNum,
                 DynamicPartitionUtil.DATETIME_FORMAT);
-        partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COLDOWN_TIME, cooldownTime);
+        partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, cooldownTime);
     }
 
     private Range<PartitionKey> getClosedRange(Database db, OlapTable olapTable, Column partitionColumn, String partitionFormat,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 661eaaddc7..523f57a98c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -67,6 +67,7 @@ public class PartitionsProcDir implements ProcDirInterface {
             .add("VisibleVersion").add("VisibleVersionTime")
             .add("State").add("PartitionKey").add("Range").add("DistributionKey")
             .add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime")
+            .add("RemoteStorageResource").add("RemoteStorageCooldownTime")
             .add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation")
             .build();
 
@@ -269,6 +270,8 @@ public class PartitionsProcDir implements ProcDirInterface {
                 DataProperty dataProperty = tblPartitionInfo.getDataProperty(partitionId);
                 partitionInfo.add(dataProperty.getStorageMedium().name());
                 partitionInfo.add(TimeUtils.longToTimeString(dataProperty.getCooldownTimeMs()));
+                partitionInfo.add(dataProperty.getRemoteStorageResourceName());
+                partitionInfo.add(TimeUtils.longToTimeString(dataProperty.getRemoteCooldownTimeMs()));
 
                 partitionInfo.add(TimeUtils.longToTimeString(partition.getLastCheckTime()));
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index ceac147bae..dca2b5f762 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -19,12 +19,14 @@ package org.apache.doris.common.util;
 
 import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
@@ -56,7 +58,8 @@ public class PropertyAnalyzer {
     public static final String PROPERTIES_REPLICATION_ALLOCATION = "replication_allocation";
     public static final String PROPERTIES_STORAGE_TYPE = "storage_type";
     public static final String PROPERTIES_STORAGE_MEDIUM = "storage_medium";
-    public static final String PROPERTIES_STORAGE_COLDOWN_TIME = "storage_cooldown_time";
+    public static final String PROPERTIES_STORAGE_COOLDOWN_TIME = "storage_cooldown_time";
+    public static final String PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME = "remote_storage_cooldown_time";
     // for 1.x -> 2.x migration
     public static final String PROPERTIES_VERSION_INFO = "version_info";
     // for restore
@@ -85,6 +88,8 @@ public class PropertyAnalyzer {
 
     public static final String PROPERTIES_INMEMORY = "in_memory";
 
+    public static final String PROPERTIES_REMOTE_STORAGE_RESOURCE = "remote_storage_resource";
+
     public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
 
     public static final String PROPERTIES_STRICT_RANGE = "strict_range";
@@ -105,15 +110,19 @@ public class PropertyAnalyzer {
 
     public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty)
             throws AnalysisException {
-        if (properties == null) {
+        if (properties == null || properties.isEmpty()) {
             return oldDataProperty;
         }
 
         TStorageMedium storageMedium = null;
-        long coolDownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
+        long cooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
+        String remoteStorageResourceName = "";
+        long remoteCooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
 
         boolean hasMedium = false;
         boolean hasCooldown = false;
+        boolean hasRemoteStorageResource = false;
+        boolean hasRemoteCooldown = false;
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             String key = entry.getKey();
             String value = entry.getValue();
@@ -126,42 +135,83 @@ public class PropertyAnalyzer {
                 } else {
                     throw new AnalysisException("Invalid storage medium: " + value);
                 }
-            } else if (!hasCooldown && key.equalsIgnoreCase(PROPERTIES_STORAGE_COLDOWN_TIME)) {
-                hasCooldown = true;
+            } else if (!hasCooldown && key.equalsIgnoreCase(PROPERTIES_STORAGE_COOLDOWN_TIME)) {
+                DateLiteral dateLiteral = new DateLiteral(value, Type.DATETIME);
+                cooldownTimeStamp = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
+                if (cooldownTimeStamp != DataProperty.MAX_COOLDOWN_TIME_MS) {
+                    hasCooldown = true;
+                }
+            } else if (!hasRemoteStorageResource && key.equalsIgnoreCase(PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
+                if (!Strings.isNullOrEmpty(value)) {
+                    hasRemoteStorageResource = true;
+                    remoteStorageResourceName = value;
+                }
+            } else if (!hasRemoteCooldown && key.equalsIgnoreCase(PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME)) {
                 DateLiteral dateLiteral = new DateLiteral(value, Type.DATETIME);
-                coolDownTimeStamp = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
+                remoteCooldownTimeStamp = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
+                if (remoteCooldownTimeStamp != DataProperty.MAX_COOLDOWN_TIME_MS) {
+                    hasRemoteCooldown = true;
+                }
             }
         } // end for properties
 
+        // Check properties
+
         if (!hasCooldown && !hasMedium) {
             return oldDataProperty;
         }
 
         properties.remove(PROPERTIES_STORAGE_MEDIUM);
-        properties.remove(PROPERTIES_STORAGE_COLDOWN_TIME);
+        properties.remove(PROPERTIES_STORAGE_COOLDOWN_TIME);
+        properties.remove(PROPERTIES_REMOTE_STORAGE_RESOURCE);
+        properties.remove(PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME);
+
 
         if (hasCooldown && !hasMedium) {
             throw new AnalysisException("Invalid data property. storage medium property is not found");
         }
 
         if (storageMedium == TStorageMedium.HDD && hasCooldown) {
-            throw new AnalysisException("Can not assign cooldown timestamp to HDD storage medium");
+            cooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
+            LOG.info("Can not assign cool down timestamp to HDD storage medium, ignore user setting.");
+            hasCooldown = false;
         }
 
         long currentTimeMs = System.currentTimeMillis();
         if (storageMedium == TStorageMedium.SSD && hasCooldown) {
-            if (coolDownTimeStamp <= currentTimeMs) {
-                throw new AnalysisException("Cooldown time should later than now");
+            if (cooldownTimeStamp <= currentTimeMs) {
+                throw new AnalysisException("Cool down time should later than now");
             }
         }
 
         if (storageMedium == TStorageMedium.SSD && !hasCooldown) {
             // set default cooldown time
-            coolDownTimeStamp = currentTimeMs + Config.storage_cooldown_second * 1000L;
+            cooldownTimeStamp = currentTimeMs + Config.storage_cooldown_second * 1000L;
+        }
+
+        // check remote_storage_resource and remote_storage_cooldown_time
+        if ((!hasRemoteCooldown && hasRemoteStorageResource) || (hasRemoteCooldown && !hasRemoteStorageResource)) {
+            throw new AnalysisException("Invalid data property, " +
+                    "`remote_storage_resource` and `remote_storage_cooldown_time` must be used together.");
+        }
+        if (hasRemoteStorageResource && hasRemoteCooldown) {
+            // check remote resource
+            Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(remoteStorageResourceName);
+            if (resource == null) {
+                throw new AnalysisException("Invalid data property, " +
+                        "`remote_storage_resource` [" + remoteStorageResourceName + "] dose not exist.");
+            }
+            // check remote storage cool down timestamp
+            if (remoteCooldownTimeStamp <= currentTimeMs) {
+                throw new AnalysisException("Remote storage cool down time should later than now");
+            }
+            if (hasCooldown && (remoteCooldownTimeStamp <= cooldownTimeStamp)) {
+                throw new AnalysisException("`remote_storage_cooldown_time` should later than `storage_cooldown_time`.");
+            }
         }
 
         Preconditions.checkNotNull(storageMedium);
-        return new DataProperty(storageMedium, coolDownTimeStamp);
+        return new DataProperty(storageMedium, cooldownTimeStamp, remoteStorageResourceName, remoteCooldownTimeStamp);
     }
     
     public static short analyzeShortKeyColumnCount(Map<String, String> properties) throws AnalysisException {
@@ -421,6 +471,21 @@ public class PropertyAnalyzer {
         return defaultVal;
     }
 
+    // analyze remote storage resource
+    public static String analyzeRemoteStorageResource(Map<String, String> properties) throws AnalysisException {
+        String resourceName = "";
+        if (properties != null && properties.containsKey(PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
+            resourceName = properties.get(PROPERTIES_REMOTE_STORAGE_RESOURCE);
+            // check resource existence
+            Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
+            if (resource == null) {
+                throw new AnalysisException("Resource does not exist, name: " + resourceName);
+            }
+        }
+
+        return resourceName;
+    }
+
     // analyze property like : "type" = "xxx";
     public static String analyzeType(Map<String, String> properties) throws AnalysisException {
         String type = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index e35011ce76..d9166c9ec2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -533,7 +533,8 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
-            case OperationType.OP_CREATE_RESOURCE: {
+            case OperationType.OP_CREATE_RESOURCE:
+            case OperationType.OP_ALTER_RESOURCE:{
                 data = Resource.read(in);
                 isRead = true;
                 break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 574c77e156..f68b5fc33a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -682,6 +682,11 @@ public class EditLog {
                     catalog.getResourceMgr().replayDropResource(operationLog);
                     break;
                 }
+                case OperationType.OP_ALTER_RESOURCE: {
+                    final Resource resource = (Resource) journal.getData();
+                    catalog.getResourceMgr().replayAlterResource(resource);
+                    break;
+                }
                 case OperationType.OP_CREATE_SMALL_FILE: {
                     SmallFile smallFile = (SmallFile) journal.getData();
                     catalog.getSmallFileMgr().replayCreateFile(smallFile);
@@ -1311,6 +1316,10 @@ public class EditLog {
         logEdit(OperationType.OP_DROP_RESOURCE, operationLog);
     }
 
+    public void logAlterResource(Resource resource) {
+        logEdit(OperationType.OP_ALTER_RESOURCE, resource);
+    }
+
     public void logCreateSmallFile(SmallFile info) {
         logEdit(OperationType.OP_CREATE_SMALL_FILE, info);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index bb5aaa971e..3c7a42dddd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -207,6 +207,7 @@ public class OperationType {
     // resource 276~290
     public static final short OP_CREATE_RESOURCE = 276;
     public static final short OP_DROP_RESOURCE = 277;
+    public static final short OP_ALTER_RESOURCE = 278;
 
     // alter external table
     public static final short OP_ALTER_EXTERNAL_TABLE_SCHEMA = 280;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 300f25f094..2c91e79428 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.MapType;
 import org.apache.doris.catalog.OdbcCatalogResource;
 import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.catalog.Resource;
+import org.apache.doris.catalog.S3Resource;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.SparkResource;
 import org.apache.doris.catalog.StructType;
@@ -109,7 +110,8 @@ public class GsonUtils {
     private static RuntimeTypeAdapterFactory<Resource> resourceTypeAdapterFactory = RuntimeTypeAdapterFactory
             .of(Resource.class, "clazz")
             .registerSubtype(SparkResource.class, SparkResource.class.getSimpleName())
-            .registerSubtype(OdbcCatalogResource.class, OdbcCatalogResource.class.getSimpleName());
+            .registerSubtype(OdbcCatalogResource.class, OdbcCatalogResource.class.getSimpleName())
+            .registerSubtype(S3Resource.class, S3Resource.class.getSimpleName());
 
     // runtime adapter for class "AlterJobV2"
     private static RuntimeTypeAdapterFactory<AlterJobV2> alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index c35a27b8f9..f8f0102072 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -31,6 +31,7 @@ import org.apache.doris.analysis.AlterColumnStatsStmt;
 import org.apache.doris.analysis.AlterDatabasePropertyStmt;
 import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
 import org.apache.doris.analysis.AlterDatabaseRename;
+import org.apache.doris.analysis.AlterResourceStmt;
 import org.apache.doris.analysis.AlterRoutineLoadStmt;
 import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
 import org.apache.doris.analysis.AlterSystemStmt;
@@ -299,6 +300,8 @@ public class DdlExecutor {
             catalog.getRefreshManager().handleRefreshTable((RefreshTableStmt) ddlStmt);
         } else if (ddlStmt instanceof RefreshDbStmt) {
             catalog.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
+        } else if (ddlStmt instanceof AlterResourceStmt) {
+            catalog.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
         } else {
             throw new DdlException("Unknown statement.");
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
index c055ba20ba..74a18f7007 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
@@ -19,8 +19,10 @@ package org.apache.doris.alter;
 
 import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateResourceStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DropResourceStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DataProperty;
@@ -36,6 +38,7 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.qe.ConnectContext;
@@ -132,7 +135,7 @@ public class AlterTest {
                 "    'replication_num' = '1',\n" +
                 "    'in_memory' = 'false',\n" +
                 "    'storage_medium' = 'SSD',\n" +
-                "    'storage_cooldown_time' = '9999-12-31 00:00:00'\n" +
+                "    'storage_cooldown_time' = '2999-12-31 00:00:00'\n" +
                 ");");
 
         createTable("CREATE TABLE test.tbl5\n" +
@@ -167,6 +170,59 @@ public class AlterTest {
                 "\"driver\" = \"Oracle Driver\",\n" +
                 "\"odbc_type\" = \"oracle\"\n" +
                 ");");
+
+        // s3 resource
+        createRemoteStorageResource("create resource \"remote_s3\"\n" +
+                "properties\n" +
+                "(\n" +
+                "   \"type\" = \"s3\", \n" +
+                "   \"s3_endpoint\" = \"bj\",\n" +
+                "   \"s3_region\" = \"bj\",\n" +
+                "   \"s3_root_path\" = \"/path/to/root\",\n" +
+                "   \"s3_access_key\" = \"bbb\",\n" +
+                "   \"s3_secret_key\" = \"aaaa\",\n" +
+                "   \"s3_max_connections\" = \"50\",\n" +
+                "   \"s3_request_timeout_ms\" = \"3000\",\n" +
+                "   \"s3_connection_timeout_ms\" = \"1000\"\n" +
+                ");");
+
+        createRemoteStorageResource("create resource \"remote_s3_1\"\n" +
+                "properties\n" +
+                "(\n" +
+                "   \"type\" = \"s3\", \n" +
+                "   \"s3_endpoint\" = \"bj\",\n" +
+                "   \"s3_region\" = \"bj\",\n" +
+                "   \"s3_root_path\" = \"/path/to/root\",\n" +
+                "   \"s3_access_key\" = \"bbb\",\n" +
+                "   \"s3_secret_key\" = \"aaaa\",\n" +
+                "   \"s3_max_connections\" = \"50\",\n" +
+                "   \"s3_request_timeout_ms\" = \"3000\",\n" +
+                "   \"s3_connection_timeout_ms\" = \"1000\"\n" +
+                ");");
+
+        createTable("CREATE TABLE test.tbl_remote\n" +
+                "(\n" +
+                "    k1 date,\n" +
+                "    k2 int,\n" +
+                "    v1 int sum\n" +
+                ")\n" +
+                "PARTITION BY RANGE(k1)\n" +
+                "(\n" +
+                "    PARTITION p1 values less than('2020-02-01'),\n" +
+                "    PARTITION p2 values less than('2020-03-01'),\n" +
+                "    PARTITION p3 values less than('2020-04-01'),\n" +
+                "    PARTITION p4 values less than('2020-05-01')\n" +
+                ")\n" +
+                "DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
+                "PROPERTIES" +
+                "(" +
+                "    'replication_num' = '1',\n" +
+                "    'in_memory' = 'false',\n" +
+                "    'storage_medium' = 'SSD',\n" +
+                "    'storage_cooldown_time' = '2122-04-01 20:24:00',\n" +
+                "    'remote_storage_resource' = 'remote_s3',\n" +
+                "    'remote_storage_cooldown_time' = '2122-12-01 20:23:00'" +
+                ");");
     }
 
     @AfterClass
@@ -180,6 +236,11 @@ public class AlterTest {
         Catalog.getCurrentCatalog().createTable(createTableStmt);
     }
 
+    private static void createRemoteStorageResource(String sql) throws Exception {
+        CreateResourceStmt stmt = (CreateResourceStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Catalog.getCurrentCatalog().getResourceMgr().createResource(stmt);
+    }
+
     private static void alterTable(String sql, boolean expectedException) throws Exception {
         try {
             AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
@@ -383,21 +444,37 @@ public class AlterTest {
         }
         Assert.assertEquals(false, tbl4.getPartitionInfo().getIsInMemory(p4.getId()));
 
-        // batch update storage_medium and storage_cool_down properties
-        stmt = "alter table test.tbl4 modify partition (p2, p3, p4) set ('storage_medium' = 'HDD')";
-        DateLiteral dateLiteral = new DateLiteral("9999-12-31 00:00:00", Type.DATETIME);
-        long coolDownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
-        DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, coolDownTimeMs);
-        partitionList = Lists.newArrayList(p2, p3, p4);
+        // batch update storage_medium and storage_cooldown properties
+        // alter storage_medium
+        stmt = "alter table test.tbl4 modify partition (p3, p4) set ('storage_medium' = 'HDD')";
+        DateLiteral dateLiteral = new DateLiteral("2999-12-31 00:00:00", Type.DATETIME);
+        long cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
+        DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
+        partitionList = Lists.newArrayList(p3, p4);
         for (Partition partition : partitionList) {
             Assert.assertEquals(oldDataProperty, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
         }
         alterTable(stmt, false);
-        DataProperty newDataProperty = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS);
+        DataProperty newDataProperty = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "", DataProperty.MAX_COOLDOWN_TIME_MS);
         for (Partition partition : partitionList) {
             Assert.assertEquals(newDataProperty, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
         }
         Assert.assertEquals(oldDataProperty, tbl4.getPartitionInfo().getDataProperty(p1.getId()));
+        Assert.assertEquals(oldDataProperty, tbl4.getPartitionInfo().getDataProperty(p2.getId()));
+
+        // alter cooldown_time
+        stmt = "alter table test.tbl4 modify partition (p1, p2) set ('storage_cooldown_time' = '2100-12-31 00:00:00')";
+        alterTable(stmt, false);
+
+        dateLiteral = new DateLiteral("2100-12-31 00:00:00", Type.DATETIME);
+        cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
+        DataProperty newDataProperty1 = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
+        partitionList = Lists.newArrayList(p1, p2);
+        for (Partition partition : partitionList) {
+            Assert.assertEquals(newDataProperty1, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
+        }
+        Assert.assertEquals(newDataProperty, tbl4.getPartitionInfo().getDataProperty(p3.getId()));
+        Assert.assertEquals(newDataProperty, tbl4.getPartitionInfo().getDataProperty(p4.getId()));
 
         // batch update range partitions' properties with *
         stmt = "alter table test.tbl4 modify partition (*) set ('replication_num' = '1')";
@@ -408,6 +485,75 @@ public class AlterTest {
         }
     }
 
+    @Test
+    public void testAlterRemoteStorageTableDataProperties() throws Exception {
+        Database db = Catalog.getCurrentCatalog().getDbOrMetaException("default_cluster:test");
+        OlapTable tblRemote = (OlapTable) db.getTableOrMetaException("tbl_remote");
+        Partition p1 = tblRemote.getPartition("p1");
+        Partition p2 = tblRemote.getPartition("p2");
+        Partition p3 = tblRemote.getPartition("p3");
+        Partition p4 = tblRemote.getPartition("p4");
+
+        DateLiteral dateLiteral = new DateLiteral("2122-04-01 20:24:00", Type.DATETIME);
+        long cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
+        DateLiteral dateLiteral1 = new DateLiteral("2122-12-01 20:23:00", Type.DATETIME);
+        long remoteCooldownTimeMs = dateLiteral1.unixTimestamp(TimeUtils.getTimeZone());
+        DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "remote_s3", remoteCooldownTimeMs);
+        List<Partition> partitionList = Lists.newArrayList(p2, p3, p4);
+        for (Partition partition : partitionList) {
+            Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
+        }
+
+        // alter cooldown_time
+        String stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('storage_cooldown_time' = '2100-04-01 22:22:22')";
+        alterTable(stmt, false);
+        DateLiteral newDateLiteral = new DateLiteral("2100-04-01 22:22:22", Type.DATETIME);
+        long newCooldownTimeMs = newDateLiteral.unixTimestamp(TimeUtils.getTimeZone());
+        DataProperty dataProperty2 = new DataProperty(TStorageMedium.SSD, newCooldownTimeMs, "remote_s3", remoteCooldownTimeMs);
+        for (Partition partition : partitionList) {
+            Assert.assertEquals(dataProperty2, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
+        }
+        Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
+
+        // alter storage_medium
+        stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('storage_medium' = 'HDD')";
+        alterTable(stmt, false);
+        DataProperty dataProperty1 = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "remote_s3", remoteCooldownTimeMs);
+        for (Partition partition : partitionList) {
+            Assert.assertEquals(dataProperty1, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
+        }
+        Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
+
+        // alter remote_storage
+        stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('remote_storage_resource' = 'remote_s3_1')";
+        alterTable(stmt, true);
+        Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
+
+        // alter remote_storage_cooldown_time
+        stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('remote_storage_cooldown_time' = '2122-12-01 20:23:00')";
+        alterTable(stmt, false);
+        DateLiteral newRemoteDate = new DateLiteral("2122-12-01 20:23:00", Type.DATETIME);
+        long newRemoteCooldownTimeMs = newRemoteDate.unixTimestamp(TimeUtils.getTimeZone());
+        DataProperty dataProperty4 = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "remote_s3", newRemoteCooldownTimeMs);
+        for (Partition partition : partitionList) {
+            Assert.assertEquals(dataProperty4, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
+        }
+        Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
+
+        // alter recover to old state
+        stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set (" +
+                "'storage_medium' = 'SSD', " +
+                "'storage_cooldown_time' = '2122-04-01 20:24:00', " +
+                "'remote_storage_cooldown_time' = '2122-12-01 20:23:00'" +
+                ")";
+        alterTable(stmt, false);
+        for (Partition partition : partitionList) {
+            Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
+        }
+        Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
+
+    }
+
     @Test
     public void testDynamicPartitionDropAndAdd() throws Exception {
         // test day range
@@ -890,4 +1036,11 @@ public class AlterTest {
         Assert.assertEquals("tbl1", odbcTable.getOdbcTableName());
         Assert.assertEquals("MySQL", odbcTable.getOdbcDriver());
     }
+
+    @Test(expected = DdlException.class)
+    public void testDropInUseResource() throws Exception {
+        String sql = "drop resource remote_s3";
+        DropResourceStmt stmt = (DropResourceStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Catalog.getCurrentCatalog().getResourceMgr().dropResource(stmt);
+    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java
index 5fc91f73cb..b7d66ff531 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Resource;
+import org.apache.doris.catalog.Resource.ResourceType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PaloAuth;
@@ -39,12 +40,14 @@ public class CreateResourceStmtTest {
     private Analyzer analyzer;
     private String resourceName1;
     private String resourceName2;
+    private String resourceName3;
 
     @Before()
     public void setUp() {
         analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
         resourceName1 = "spark0";
         resourceName2 = "odbc";
+        resourceName3 = "s3";
     }
 
     @Test
@@ -74,6 +77,14 @@ public class CreateResourceStmtTest {
         Assert.assertEquals(Resource.ResourceType.ODBC_CATALOG, stmt.getResourceType());
         Assert.assertEquals("CREATE EXTERNAL RESOURCE 'odbc' PROPERTIES(\"type\"  =  \"odbc_catalog\")", stmt.toSql());
 
+        properties = Maps.newHashMap();
+        properties.put("type", "s3");
+        stmt = new CreateResourceStmt(true, resourceName3, properties);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(resourceName3, stmt.getResourceName());
+        Assert.assertEquals(ResourceType.S3, stmt.getResourceType());
+        Assert.assertEquals("CREATE EXTERNAL RESOURCE 's3' PROPERTIES(\"type\"  =  \"s3\")", stmt.toSql());
+
     }
 
     @Test(expected = AnalysisException.class)
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DataPropertyTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DataPropertyTest.java
index 7bb4eac665..701dfdcc95 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DataPropertyTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DataPropertyTest.java
@@ -33,8 +33,9 @@ public class DataPropertyTest {
         dataProperty = new DataProperty(TStorageMedium.SSD);
         Assert.assertNotEquals(DataProperty.MAX_COOLDOWN_TIME_MS, dataProperty.getCooldownTimeMs());
 
-        dataProperty = new DataProperty(TStorageMedium.SSD, System.currentTimeMillis() + 24 * 3600 * 1000L);
-        Assert.assertEquals(System.currentTimeMillis() + 24 * 3600 * 1000L, dataProperty.getCooldownTimeMs());
+        long storageCooldownTimeMs = System.currentTimeMillis() + 24 * 3600 * 1000L;
+        dataProperty = new DataProperty(TStorageMedium.SSD, storageCooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
+        Assert.assertEquals(storageCooldownTimeMs, dataProperty.getCooldownTimeMs());
 
         dataProperty = new DataProperty(TStorageMedium.HDD);
         Assert.assertEquals(DataProperty.MAX_COOLDOWN_TIME_MS, dataProperty.getCooldownTimeMs());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ResourceMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ResourceMgrTest.java
index c15c7c1790..20e4785a2b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ResourceMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ResourceMgrTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.catalog;
 
 import org.apache.doris.analysis.AccessTestUtil;
+import org.apache.doris.analysis.AlterResourceStmt;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CreateResourceStmt;
 import org.apache.doris.analysis.DropResourceStmt;
@@ -36,35 +37,67 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public class ResourceMgrTest {
-    private String name;
-    private String type;
+    // spark resource
     private String master;
+    private String sparkResName;
+    private String sparkRestype;
     private String workingDir;
     private String broker;
-    private Map<String, String> properties;
+    private Map<String, String> sparkProperties;
+    // s3 resource
+    private String s3ResName;
+    private String s3ResType;
+    private String s3Endpoint;
+    private String s3Region;
+    private String s3RootPath;
+    private String s3AccessKey;
+    private String s3SecretKey;
+    private String s3MaxConnections;
+    private String s3ReqTimeoutMs;
+    private String s3ConnTimeoutMs;
+    private Map<String, String> s3Properties;
     private Analyzer analyzer;
 
     @Before
     public void setUp() {
-        name = "spark0";
-        type = "spark";
+        sparkResName = "spark0";
+        sparkRestype = "spark";
         master = "spark://127.0.0.1:7077";
         workingDir = "hdfs://127.0.0.1/tmp/doris";
         broker = "broker0";
-        properties = Maps.newHashMap();
-        properties.put("type", type);
-        properties.put("spark.master", master);
-        properties.put("spark.submit.deployMode", "cluster");
-        properties.put("working_dir", workingDir);
-        properties.put("broker", broker);
+        sparkProperties = Maps.newHashMap();
+        sparkProperties.put("type", sparkRestype);
+        sparkProperties.put("spark.master", master);
+        sparkProperties.put("spark.submit.deployMode", "cluster");
+        sparkProperties.put("working_dir", workingDir);
+        sparkProperties.put("broker", broker);
+
+        s3ResName = "s30";
+        s3ResType = "s3";
+        s3Endpoint = "aaa";
+        s3Region = "bj";
+        s3RootPath = "/path/to/root";
+        s3AccessKey = "xxx";
+        s3SecretKey = "yyy";
+        s3MaxConnections = "50";
+        s3ReqTimeoutMs = "3000";
+        s3ConnTimeoutMs = "1000";
+        s3Properties = new HashMap<>();
+        s3Properties.put("type", s3ResType);
+        s3Properties.put("s3_endpoint", s3Endpoint);
+        s3Properties.put("s3_region", s3Region);
+        s3Properties.put("s3_root_path", s3RootPath);
+        s3Properties.put("s3_access_key", s3AccessKey);
+        s3Properties.put("s3_secret_key", s3SecretKey);
         analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
     }
 
     @Test
-    public void testAddDropResource(@Injectable BrokerMgr brokerMgr, @Injectable EditLog editLog,
+    public void testAddAlterDropResource(@Injectable BrokerMgr brokerMgr, @Injectable EditLog editLog,
                                     @Mocked Catalog catalog, @Injectable PaloAuth auth) throws UserException {
         new Expectations() {
             {
@@ -81,22 +114,54 @@ public class ResourceMgrTest {
             }
         };
 
+        // spark resource
         // add
         ResourceMgr mgr = new ResourceMgr();
-        CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
+        CreateResourceStmt stmt = new CreateResourceStmt(true, sparkResName, sparkProperties);
         stmt.analyze(analyzer);
         Assert.assertEquals(0, mgr.getResourceNum());
         mgr.createResource(stmt);
         Assert.assertEquals(1, mgr.getResourceNum());
-        Assert.assertTrue(mgr.containsResource(name));
-        SparkResource resource = (SparkResource) mgr.getResource(name);
+        Assert.assertTrue(mgr.containsResource(sparkResName));
+        SparkResource resource = (SparkResource) mgr.getResource(sparkResName);
         Assert.assertNotNull(resource);
         Assert.assertEquals(broker, resource.getBroker());
 
+        // alter
+        workingDir = "hdfs://127.0.0.1/tmp/doris_new";
+        Map<String, String> copiedSparkProperties = Maps.newHashMap(sparkProperties);
+        copiedSparkProperties.put("working_dir", workingDir);
+        copiedSparkProperties.remove("spark.master");
+        AlterResourceStmt alterResourceStmt = new AlterResourceStmt(sparkResName, copiedSparkProperties);
+        mgr.alterResource(alterResourceStmt);
+        Assert.assertEquals(workingDir, ((SparkResource) mgr.getResource(sparkResName)).getWorkingDir());
+
         // drop
-        DropResourceStmt dropStmt = new DropResourceStmt(name);
+        DropResourceStmt dropStmt = new DropResourceStmt(sparkResName);
         mgr.dropResource(dropStmt);
         Assert.assertEquals(0, mgr.getResourceNum());
+
+        // s3 resource
+        stmt = new CreateResourceStmt(true, s3ResName, s3Properties);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(0, mgr.getResourceNum());
+        mgr.createResource(stmt);
+        Assert.assertEquals(1, mgr.getResourceNum());
+
+        // alter
+        s3Region = "sh";
+        Map<String, String> copiedS3Properties = Maps.newHashMap(s3Properties);
+        copiedS3Properties.put("s3_region", s3Region);
+        copiedS3Properties.remove("type");
+        alterResourceStmt = new AlterResourceStmt(s3ResName, copiedS3Properties);
+        mgr.alterResource(alterResourceStmt);
+        Assert.assertEquals(s3Region, ((S3Resource) mgr.getResource(s3ResName)).getProperty("s3_region"));
+
+        // drop
+        dropStmt = new DropResourceStmt(s3ResName);
+        mgr.dropResource(dropStmt);
+        Assert.assertEquals(0, mgr.getResourceNum());
+
     }
 
     @Test(expected = DdlException.class)
@@ -117,7 +182,7 @@ public class ResourceMgrTest {
 
         // add
         ResourceMgr mgr = new ResourceMgr();
-        CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
+        CreateResourceStmt stmt = new CreateResourceStmt(true, sparkResName, sparkProperties);
         stmt.analyze(analyzer);
         Assert.assertEquals(0, mgr.getResourceNum());
         mgr.createResource(stmt);
@@ -132,7 +197,7 @@ public class ResourceMgrTest {
         // drop
         ResourceMgr mgr = new ResourceMgr();
         Assert.assertEquals(0, mgr.getResourceNum());
-        DropResourceStmt stmt = new DropResourceStmt(name);
+        DropResourceStmt stmt = new DropResourceStmt(sparkResName);
         mgr.dropResource(stmt);
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java
new file mode 100644
index 0000000000..9e1f88817b
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.analysis.AccessTestUtil;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.CreateResourceStmt;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.UserException;
+import org.apache.doris.meta.MetaContext;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class S3ResourceTest {
+    private String name;
+    private String type;
+
+    private String s3Endpoint;
+    private String s3Region;
+    private String s3RootPath;
+    private String s3AccessKey;
+    private String s3SecretKey;
+    private String s3MaxConnections;
+    private String s3ReqTimeoutMs;
+    private String s3ConnTimeoutMs;
+    private Map<String, String> s3Properties;
+
+    private Analyzer analyzer;
+
+    @Before
+    public void setUp() {
+        name = "s3";
+        type = "s3";
+        s3Endpoint = "aaa";
+        s3Region = "bj";
+        s3RootPath = "/path/to/root";
+        s3AccessKey = "xxx";
+        s3SecretKey = "yyy";
+        s3MaxConnections = "50";
+        s3ReqTimeoutMs = "3000";
+        s3ConnTimeoutMs = "1000";
+        s3Properties = new HashMap<>();
+        s3Properties.put("type", type);
+        s3Properties.put("s3_endpoint", s3Endpoint);
+        s3Properties.put("s3_region", s3Region);
+        s3Properties.put("s3_root_path", s3RootPath);
+        s3Properties.put("s3_access_key", s3AccessKey);
+        s3Properties.put("s3_secret_key", s3SecretKey);
+
+        analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+    }
+
+    @Test
+    public void testFromStmt(@Mocked Catalog catalog, @Injectable PaloAuth auth) throws UserException {
+        new Expectations() {
+            {
+                catalog.getAuth();
+                result = auth;
+                auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
+                result = true;
+            }
+        };
+
+        // resource with default settings
+        CreateResourceStmt stmt = new CreateResourceStmt(true, name, s3Properties);
+        stmt.analyze(analyzer);
+        S3Resource s3Resource = (S3Resource) Resource.fromStmt(stmt);
+        Assert.assertEquals(name, s3Resource.getName());
+        Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
+        Assert.assertEquals(s3Endpoint, s3Resource.getProperty("s3_endpoint"));
+        Assert.assertEquals(s3Region, s3Resource.getProperty("s3_region"));
+        Assert.assertEquals(s3RootPath, s3Resource.getProperty("s3_root_path"));
+        Assert.assertEquals(s3AccessKey, s3Resource.getProperty("s3_access_key"));
+        Assert.assertEquals(s3SecretKey, s3Resource.getProperty("s3_secret_key"));
+        Assert.assertEquals(s3MaxConnections, s3Resource.getProperty("s3_max_connections"));
+        Assert.assertEquals(s3ReqTimeoutMs, s3Resource.getProperty("s3_request_timeout_ms"));
+        Assert.assertEquals(s3ConnTimeoutMs, s3Resource.getProperty("s3_connection_timeout_ms"));
+
+        // with no default settings
+        s3Properties.put("s3_max_connections", "100");
+        s3Properties.put("s3_request_timeout_ms", "2000");
+        s3Properties.put("s3_connection_timeout_ms", "2000");
+        stmt = new CreateResourceStmt(true, name, s3Properties);
+        stmt.analyze(analyzer);
+
+        s3Resource = (S3Resource) Resource.fromStmt(stmt);
+        Assert.assertEquals(name, s3Resource.getName());
+        Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
+        Assert.assertEquals(s3Endpoint, s3Resource.getProperty("s3_endpoint"));
+        Assert.assertEquals(s3Region, s3Resource.getProperty("s3_region"));
+        Assert.assertEquals(s3RootPath, s3Resource.getProperty("s3_root_path"));
+        Assert.assertEquals(s3AccessKey, s3Resource.getProperty("s3_access_key"));
+        Assert.assertEquals(s3SecretKey, s3Resource.getProperty("s3_secret_key"));
+        Assert.assertEquals("100", s3Resource.getProperty("s3_max_connections"));
+        Assert.assertEquals("2000", s3Resource.getProperty("s3_request_timeout_ms"));
+        Assert.assertEquals("2000", s3Resource.getProperty("s3_connection_timeout_ms"));
+    }
+
+    @Test (expected = DdlException.class)
+    public void testAbnormalResource(@Mocked Catalog catalog, @Injectable PaloAuth auth) throws UserException {
+        new Expectations() {
+            {
+                catalog.getAuth();
+                result = auth;
+                auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
+                result = true;
+            }
+        };
+        s3Properties.remove("s3_root_path");
+        CreateResourceStmt stmt = new CreateResourceStmt(true, name, s3Properties);
+        stmt.analyze(analyzer);
+        Resource.fromStmt(stmt);
+    }
+
+    @Test
+    public void testSerialization() throws Exception{
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
+        metaContext.setThreadLocalInfo();
+
+        // 1. write
+        File s3File = new File("./s3Resource");
+        s3File.createNewFile();
+        DataOutputStream s3Dos = new DataOutputStream(new FileOutputStream(s3File));
+
+        S3Resource s3Resource1 = new S3Resource("s3_1");
+        s3Resource1.write(s3Dos);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("s3_endpoint", "aaa");
+        properties.put("s3_region", "bbb");
+        properties.put("s3_root_path", "/path/to/root");
+        properties.put("s3_access_key", "xxx");
+        properties.put("s3_secret_key", "yyy");
+        S3Resource s3Resource2 = new S3Resource("s3_2");
+        s3Resource2.setProperties(properties);
+        s3Resource2.write(s3Dos);
+
+        s3Dos.flush();
+        s3Dos.close();
+
+        // 2. read
+        DataInputStream s3Dis = new DataInputStream(new FileInputStream(s3File));
+        S3Resource rS3Resource1 = (S3Resource) S3Resource.read(s3Dis);
+        S3Resource rS3Resource2 = (S3Resource) S3Resource.read(s3Dis);
+
+        Assert.assertEquals("s3_1", rS3Resource1.getName());
+        Assert.assertEquals("s3_2", rS3Resource2.getName());
+
+        Assert.assertEquals(rS3Resource2.getProperty("s3_endpoint"), "aaa");
+        Assert.assertEquals(rS3Resource2.getProperty("s3_region"), "bbb");
+        Assert.assertEquals(rS3Resource2.getProperty("s3_root_path"), "/path/to/root");
+        Assert.assertEquals(rS3Resource2.getProperty("s3_access_key"), "xxx");
+        Assert.assertEquals(rS3Resource2.getProperty("s3_secret_key"), "yyy");
+        Assert.assertEquals(rS3Resource2.getProperty("s3_max_connections"), "50");
+        Assert.assertEquals(rS3Resource2.getProperty("s3_request_timeout_ms"), "3000");
+        Assert.assertEquals(rS3Resource2.getProperty("s3_connection_timeout_ms"), "1000");
+
+        // 3. delete
+        s3Dis.close();
+        s3File.delete();
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
index 1d7cb00a39..1521034253 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
@@ -37,7 +37,6 @@ import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.thrift.TStorageMedium;
@@ -59,7 +58,6 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-//import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -67,7 +65,6 @@ import java.util.stream.LongStream;
 import mockit.Delegate;
 import mockit.Expectations;
 import mockit.Mocked;
-//import static com.google.common.collect.MoreCollectors.onlyElement;
 
 public class DiskRebalanceTest {
     private static final Logger LOG = LogManager.getLogger(DiskRebalanceTest.class);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java
index 913499a726..c741f9aa99 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java
@@ -146,7 +146,7 @@ public class PropertyAnalyzerTest {
 
         Map<String, String> properties = Maps.newHashMap();
         properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, "SSD");
-        properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COLDOWN_TIME, tomorrowTimeStr);
+        properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, tomorrowTimeStr);
         DataProperty dataProperty = PropertyAnalyzer.analyzeDataProperty(properties, new DataProperty(TStorageMedium.SSD));
         // avoid UT fail because time zone different
         DateLiteral dateLiteral = new DateLiteral(tomorrowTimeStr, Type.DATETIME);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org