You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/06 08:19:31 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3882] Support Data Archiving (#6989)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 04d379f277 [To rel/0.13][IOTDB-3882] Support Data Archiving (#6989)
04d379f277 is described below

commit 04d379f277737b69a3869a6f91722a49fd347712
Author: Mani <46...@users.noreply.github.com>
AuthorDate: Thu Oct 6 16:19:26 2022 +0800

    [To rel/0.13][IOTDB-3882] Support Data Archiving (#6989)
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  40 ++
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  28 ++
 docs/UserGuide/Process-Data/Archiving.md           | 155 ++++++++
 docs/zh/UserGuide/Process-Data/Archiving.md        | 148 +++++++
 .../iotdb/db/integration/IoTDBArchivingIT.java     | 330 ++++++++++++++++
 .../resources/conf/iotdb-engine.properties         |   4 +
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   2 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 +
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   9 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   9 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  30 ++
 .../db/engine/archiving/ArchivingManager.java      | 426 +++++++++++++++++++++
 .../db/engine/archiving/ArchivingOperate.java      | 107 ++++++
 .../engine/archiving/ArchivingOperateReader.java   | 106 +++++
 .../engine/archiving/ArchivingOperateWriter.java   |  83 ++++
 .../db/engine/archiving/ArchivingRecover.java      | 239 ++++++++++++
 .../iotdb/db/engine/archiving/ArchivingTask.java   | 201 ++++++++++
 .../db/engine/storagegroup/TsFileResource.java     |  29 ++
 .../engine/storagegroup/TsFileResourceStatus.java  |   3 +-
 .../storagegroup/VirtualStorageGroupProcessor.java |  75 ++++
 .../virtualSg/StorageGroupManager.java             |  16 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 136 +++++++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   6 +-
 .../db/qp/logical/sys/CancelArchivingOperator.java |  67 ++++
 .../db/qp/logical/sys/PauseArchivingOperator.java  |  64 ++++
 .../db/qp/logical/sys/ResumeArchivingOperator.java |  65 ++++
 .../db/qp/logical/sys/SetArchivingOperator.java    |  94 +++++
 .../db/qp/logical/sys/ShowArchivingOperator.java   |  46 +++
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   3 +-
 .../db/qp/physical/sys/PauseArchivingPlan.java     |  65 ++++
 .../iotdb/db/qp/physical/sys/SetArchivingPlan.java | 148 +++++++
 .../physical/sys/ShowArchivingPlan.java}           |  28 +-
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   1 +
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    | 122 ++++++
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   7 +
 .../ArchivingOperateWriterReaderTest.java          | 174 +++++++++
 .../db/engine/archiving/ArchivingRecoverTest.java  | 175 +++++++++
 .../iotdb/db/engine/archiving/ArchivingTest.java   | 375 ++++++++++++++++++
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   4 +
 39 files changed, 3621 insertions(+), 10 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 893504909e..5720a95b0f 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -46,6 +46,8 @@ ddlStatement
     | showSchemaTemplates | showNodesInSchemaTemplate
     | showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
     | countStorageGroup | countDevices | countTimeseries | countNodes
+    | setArchiving | cancelArchiving | pauseArchiving | resumeArchiving | showArchiving
+    | showAllArchiving
     ;
 
 dmlStatement
@@ -330,6 +332,44 @@ countNodes
     : COUNT NODES prefixPath LEVEL OPERATOR_EQ INTEGER_LITERAL
     ;
 
+// Set Archiving
+setArchiving
+    : SET ARCHIVING TO storageGroup=prefixPath startTime=DATETIME_LITERAL ttl=INTEGER_LITERAL targetDir=STRING_LITERAL
+    | SET ARCHIVING TO setArchivingClause*
+    ;
+
+setArchivingClause
+    : STORAGE_GROUP     OPERATOR_EQ storageGroup=prefixPath
+    | START_TIME        OPERATOR_EQ startTime=DATETIME_LITERAL
+    | TTL               OPERATOR_EQ ttl=INTEGER_LITERAL
+    | TARGET_DIR        OPERATOR_EQ targetDir=STRING_LITERAL
+    ;
+
+// Cancel Archiving
+cancelArchiving
+    : CANCEL ARCHIVING (ON storageGroup=prefixPath | taskId=INTEGER_LITERAL)
+    ;
+
+// Pause Archiving
+pauseArchiving
+    : PAUSE ARCHIVING (ON storageGroup=prefixPath | taskId=INTEGER_LITERAL)
+    ;
+
+// Unpause/Resume Archiving
+resumeArchiving
+    : RESUME ARCHIVING (ON storageGroup=prefixPath | taskId=INTEGER_LITERAL)
+    ;
+
+// Show Archiving
+showArchiving
+    : SHOW ARCHIVING ON prefixPath (COMMA prefixPath)*
+    ;
+
+// Show All Archiving
+showAllArchiving
+    : SHOW ALL ARCHIVING
+    ;
+
 
 /**
  * 3. Data Manipulation Language (DML)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index dff4887a61..1dd1c4ff48 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -73,6 +73,10 @@ APPEND
     : A P P E N D
     ;
 
+ARCHIVING
+    : A R C H I V I N G
+    ;
+
 AS
     : A S
     ;
@@ -109,6 +113,10 @@ CACHE
     : C A C H E
     ;
 
+CANCEL
+    : C A N C E L
+    ;
+
 CHILD
     : C H I L D
     ;
@@ -350,6 +358,10 @@ ORDER
     : O R D E R
     ;
 
+PAUSE
+    : P A U S E
+    ;
+
 PARTITION
     : P A R T I T I O N
     ;
@@ -418,6 +430,10 @@ RESOURCE
     : R E S O U R C E
     ;
 
+RESUME
+    : R E S U M E
+    ;
+
 REVOKE
     : R E V O K E
     ;
@@ -466,10 +482,18 @@ SOFFSET
     : S O F F S E T
     ;
 
+START_TIME
+    : S T A R T '_' T I M E
+    ;
+
 STORAGE
     : S T O R A G E
     ;
 
+STORAGE_GROUP
+    : S T O R A G E '_' G R O U P
+    ;
+
 START
     : S T A R T
     ;
@@ -486,6 +510,10 @@ TAGS
     : T A G S
     ;
 
+TARGET_DIR
+    : T A R G E T '_' D I R
+    ;
+
 TASK
     : T A S K
     ;
diff --git a/docs/UserGuide/Process-Data/Archiving.md b/docs/UserGuide/Process-Data/Archiving.md
new file mode 100644
index 0000000000..bb2409b41e
--- /dev/null
+++ b/docs/UserGuide/Process-Data/Archiving.md
@@ -0,0 +1,155 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# Archiving Data
+
+The archiving data tools consist of 5 Cli commands: `set`, `cancel`, `pause`, `continue`, and `show`. Users may use
+archiving tools to create archiving tasks, these archiving tasks start at the user specified date, and archives expired
+data (timestamp before expire time) into a target directory specified by user, the user can then perform other
+operations such as `pause` on the tasks.
+
+## SQL statements
+
+### Show Archiving Tasks
+
+Show the data archiving tasks.
+
+#### Syntax
+
+```sql
+SHOW ALL ARCHIVING
+SHOW ARCHIVING ON <storage_group>
+```
+
+- `<storage_group>` specifies the storage group to show archiving task on.
+
+#### Example Result
+
+```sql
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+|task id|                submit time|storage group|status|                 start time|expire time(ms)|target directory|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+|      0|2022-1-1T00:00:00.000+08:00|      root.ln| READY|2023-1-1T00:00:00.000+08:00|         360000|            /tmp|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+```
+
+### Set Data Archiving Task
+
+User submit data archiving task.
+
+#### Syntax
+
+```sql
+SET ARCHIVING TO <storage_group> <start_time> <ttl> <target_dir>
+SET ARCHIVING TO storage_group=<storage_group> start_time=<start_time> ttl=<ttl> target_dir=<target_dir>
+```
+
+- `<storage_group>` specifies the storage group to show archiving task on.
+- `<start_time>` specifies the date to start the archiving task.
+- `<ttl>` specifies the expire time for task, data with `timestamp < now - ttl` are archived, units in milliseconds.
+- `<target_dir>` specifies the target directory to move the archived data, uses string for the path.
+
+#### Example
+
+```sql
+SET ARCHIVING TO storage_group=root.ln start_time=2023-01-01 ttl=360000 target_dir="/tmp"
+SET ARCHIVING TO root.ln 2023-01-01 360000 "/tmp"
+```
+
+#### Tips
+
+- `A=` (such as `storage_group=`) in the Cli commands can be omitted, the order after omission must be the same as the
+  above.
+- The start time is in ISO 8601 format, so information such as hour/minute/second can be omitted, and it is set to 0 by
+  default after being omitted.
+- `SET` command is able to submit migration tasks for all storage groups by parameters like `root.ln.**`.
+
+### Cancel Archiving Task
+
+Stop and delete the data archiving task. (Note: data that has been archived will not be put back into the database)
+
+#### Syntax
+
+```sql
+CANCEL ARCHIVING <task_id>
+CANCEL ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` specifies the id of archiving task to cancel.
+- `<storage_group>` specifies the storage group to cancel archiving task, if many exist cancel the one with the lowest
+  start time.
+
+#### Example
+
+```sql
+CANCEL ARCHIVING 0
+CANCEL ARCHIVING ON root.ln
+```
+
+### Pause Archiving Task
+
+Suspend the data migration task, run the `RESUME` command to resume the task.
+
+#### Syntax
+
+```sql
+PAUSE ARCHIVING <task_id>
+PAUSE ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` specifies the id of archiving task to pause.
+- `<storage_group>` specifies the storage group to pause archiving task, if many exist cancel the one with the lowest
+  start time.
+
+#### Example
+
+```sql
+PAUSE ARCHIVING 0
+PAUSE ARCHIVING ON root.ln
+```
+
+### Resume Archiving Task
+
+Resume suspended data archiving tasks.
+
+#### Syntax
+
+```sql
+RESUME ARCHIVING <task_id>
+RESUME ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` specifies the id of archiving task to resume.
+- `<storage_group>` specifies the storage group to resume archiving task, if many exist cancel the one with the lowest
+  start time.
+
+#### Example
+
+```sql
+RESUME ARCHIVING 0
+RESUME ARCHIVING ON root.ln
+```
+
+## System Parameter Configuration
+
+| Name                   | Description                                                             | Data Type | Default Value |
+|:-----------------------|-------------------------------------------------------------------------| --------- | ------------- |
+| `archiving_thread_num` | The number of threads in the thread pool that executes archiving tasks. | int       | 2             |
diff --git a/docs/zh/UserGuide/Process-Data/Archiving.md b/docs/zh/UserGuide/Process-Data/Archiving.md
new file mode 100644
index 0000000000..e5eaecce1a
--- /dev/null
+++ b/docs/zh/UserGuide/Process-Data/Archiving.md
@@ -0,0 +1,148 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# 数据归档
+
+数据归档功能提供 5 个 Cli 命令:包括查看、提交、删除、暂停和继续归档任务。
+用户可以创建归档任务,这些归档任务由用户指定的的启动时间,并归档过期数据到用户指定的目录。
+
+## SQL 语句
+
+### 查看数据归档任务
+
+显示数据归档任务。
+
+#### 语法
+
+```sql
+SHOW ALL ARCHIVING
+SHOW ARCHIVING ON <storage_group>
+```
+
+- `<storage_group>` 返回指定存储组上的任务参数以及状态。
+
+#### 结果示例
+
+```sql
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+|task id|                submit time|storage group|status|                 start time|expire time(ms)|target directory|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+|      0|2022-1-1T00:00:00.000+08:00|      root.ln| READY|2023-1-1T00:00:00.000+08:00|         360000|            /tmp|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+```
+
+### 提交数据归档任务
+
+用户提交数据归档任务。
+
+#### 语法
+
+```sql
+SET ARCHIVING TO <storage_group> <start_time> <ttl> <target_dir>
+SET ARCHIVING TO storage_group=<storage_group> start_time=<start_time> ttl=<ttl> target_dir=<target_dir>
+```
+
+- `<storage_group>` 指定的归档的存储组。
+- `<start_time>` 归档任务开始执行的时间。
+- `<ttl>` 数据过期时长,当数据的时间辍 `timestamp < now - ttl` 则为过期数据,单位为毫秒。
+- `<target_dir>` 数据文件被归档存储的目标路径,使用字符串指定路径。
+
+#### 示例
+
+```sql
+SET ARCHIVING TO storage_group=root.ln start_time=2023-01-01 ttl=360000 target_dir="/tmp"
+SET ARCHIVING TO root.ln 2023-01-01 360000 "/tmp"
+```
+
+#### 提示
+
+- 指令中的 `A=` (比如 `storage_group=`)可以省略,省略后顺序必须和上述一致。
+- 开始时间使用 ISO 8601 格式,因此可以省略时/分/秒等信息,省略后默认设成 0。
+- 可以提交全部存储组的归档任务,使用类似 `root.ln.**`。
+
+### 删除数据归档任务
+
+停止并删除数据归档任务。(注意:已经被归档的数据不会被放回数据库中)
+
+#### 语法
+
+```sql
+CANCEL ARCHIVING <task_id>
+CANCEL ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` 归档任务的索引号。
+- `<storage_group>` 取消归档任务的存储组,如果存在多个则取启动时间最早的任务。
+
+#### 示例
+
+```sql
+CANCEL ARCHIVING 0
+CANCEL ARCHIVING ON root.ln
+```
+
+### 暂停数据归档任务
+
+将数据归档任务挂起,可通过 `RESUME` 命令让任务重新执行。
+
+#### 语法
+
+```sql
+PAUSE ARCHIVING <task_id>
+PAUSE ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` 归档任务的索引号。
+- `<storage_group>` 暂停归档任务的存储组,如果存在多个则取启动时间最早的任务。
+
+#### 示例
+
+```sql
+PAUSE ARCHIVING 0
+PAUSE ARCHIVING ON root.ln
+```
+
+### 继续数据归档任务
+
+让挂起的数据归档任务重新执行。
+
+#### 语法
+
+```sql
+RESUME ARCHIVING <task_id>
+RESUME ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` 归档任务的索引号。
+- `<storage_group>` 继续归档任务的存储组,如果存在多个则取启动时间最早的任务。
+
+#### 示例
+
+```sql
+RESUME ARCHIVING 0
+RESUME ARCHIVING ON root.ln
+```
+
+## 系统参数配置
+
+| 参数名                    | 描述                     | 数据类型 | 默认值 |
+|:-----------------------| ------------------------ | -------- | ------ |
+| `archiving_thread_num` | 数据归档任务使用的线程数 | int      | 2      |
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
new file mode 100644
index 0000000000..a46e3b99ed
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.ClusterTest;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBArchivingIT {
+  File testTargetDir;
+  final long ARCHIVING_CHECK_TIME = 60L;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeTest();
+
+    testTargetDir = new File("testTargetDir");
+    testTargetDir.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterTest();
+
+    FileUtils.deleteDirectory(testTargetDir);
+  }
+
+  @Test
+  @Category({ClusterTest.class})
+  public void testArchiving() throws SQLException, InterruptedException {
+    StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try {
+        statement.execute(
+            "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 0 '" + testTargetDir.getPath() + "'");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+      try {
+        statement.execute("CANCEL ARCHIVING ON root.ARCHIVING_SG1");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+      try {
+        statement.execute("PAUSE ARCHIVING ON root.ARCHIVING_SG1");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+      try {
+        statement.execute("RESUME ARCHIVING ON root.ARCHIVING_SG1");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+
+      statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG1");
+      statement.execute(
+          "CREATE TIMESERIES root.ARCHIVING_SG1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN");
+
+      try {
+        statement.execute("SET ARCHIVING TO storage_group=root.ARCHIVING_SG1");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.METADATA_ERROR.getStatusCode(), e.getErrorCode());
+      }
+
+      // test set when ttl is in range
+
+      long now = System.currentTimeMillis();
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d, %d)",
+                now - 100000 + i, i));
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+
+      // test set when ttl isn't in range
+      StorageEngine.getInstance().syncCloseAllProcessor();
+
+      statement.execute(
+          "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 1000 '" + testTargetDir.getPath() + "'");
+
+      Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(0, cnt);
+      }
+
+      // test pause archive
+
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d, %d)",
+                now - 5000 + i, i));
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+
+      StorageEngine.getInstance().syncCloseAllProcessor();
+
+      statement.execute(
+          "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 100000 '"
+              + testTargetDir.getPath()
+              + "'");
+
+      Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+
+      StorageEngine.getInstance().syncCloseAllProcessor();
+
+      StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+      statement.execute(
+          "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 0 '" + testTargetDir.getPath() + "'");
+      statement.execute("PAUSE ARCHIVING ON root.ARCHIVING_SG1");
+
+      StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+
+      Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+
+      StorageEngine.getInstance().syncCloseAllProcessor();
+
+      // test resume archive
+      statement.execute("RESUME ARCHIVING ON root.ARCHIVING_SG1");
+
+      Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(0, cnt);
+      }
+
+      // test cancel archive
+
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d, %d)",
+                now - 5000 + i, i));
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+
+      StorageEngine.getInstance().syncCloseAllProcessor();
+
+      StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+      statement.execute(
+          "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 0 '" + testTargetDir.getPath() + "'");
+      statement.execute("CANCEL ARCHIVING ON root.ARCHIVING_SG1");
+
+      StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+
+      Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+    }
+  }
+
+  @Test
+  @Category({ClusterTest.class})
+  public void testShowArchiving() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+      statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG2");
+      statement.execute(
+          "CREATE TIMESERIES root.ARCHIVING_SG2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN");
+
+      statement.execute(
+          "SET ARCHIVING TO root.ARCHIVING_SG2 2000-12-13 100 '" + testTargetDir.getPath() + "'");
+
+      ResultSet resultSet = statement.executeQuery("SHOW ALL ARCHIVING");
+
+      boolean flag = false;
+
+      while (resultSet.next()) {
+        if (resultSet.getString(3).equals("root.ARCHIVING_SG2")) {
+          flag = true;
+          assertEquals("READY", resultSet.getString(4));
+          assertTrue(resultSet.getString(5).startsWith("2000-12-13"));
+          assertEquals(100, resultSet.getLong(6));
+          assertEquals(testTargetDir.getPath(), resultSet.getString(7));
+        }
+      }
+
+      assertTrue(flag);
+    }
+  }
+
+  @Test
+  @Category({ClusterTest.class})
+  public void testSetArchive() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+      statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG3");
+      statement.execute(
+          "CREATE TIMESERIES root.ARCHIVING_SG3.s3 WITH DATATYPE=INT32, ENCODING=PLAIN");
+
+      for (int i = 0; i < 1000; i++) {
+        // test concurrent set
+        statement.execute(
+            String.format(
+                "SET ARCHIVING TO root.ARCHIVING_SG3 2000-12-13 %d '%s'",
+                i, testTargetDir.getPath()));
+      }
+
+      ResultSet resultSet = statement.executeQuery("SHOW ALL ARCHIVING");
+      Set<Integer> checkTaskId = new HashSet<>();
+      Set<Integer> checkTTL = new HashSet<>();
+
+      while (resultSet.next()) {
+        if (resultSet.getString(3).equals("root.ARCHIVING_SG3")) {
+          int taskId = Integer.parseInt(resultSet.getString(1));
+          int ttl = Integer.parseInt(resultSet.getString(6));
+          assertFalse(checkTaskId.contains(taskId));
+          checkTaskId.add(taskId);
+          assertFalse(checkTTL.contains(ttl));
+          checkTTL.add(ttl);
+        }
+      }
+
+      assertEquals(1000, checkTaskId.size());
+      assertEquals(1000, checkTTL.size());
+
+      for (int i : checkTaskId) {
+        // test concurrent cancel
+        statement.execute(String.format("CANCEL ARCHIVING %d", i));
+      }
+
+      resultSet = statement.executeQuery("SHOW ALL ARCHIVING");
+
+      while (resultSet.next()) {
+        if (resultSet.getString(3).equals("root.ARCHIVING_SG3")) {
+          assertEquals("CANCELED", resultSet.getString(4));
+        }
+      }
+    }
+  }
+}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e17a900a25..ba8ef4b390 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -275,6 +275,10 @@ timestamp_precision=ms
 # Datatype: int
 # concurrent_query_thread=16
 
+# How many threads can concurrently run archiving tasks. When <= 0, use default number of 2.
+# Datatype: int
+# archiving_thread_num=2
+
 # How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
 # Datatype: int
 # concurrent_sub_rawQuery_thread=8
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index e94b5af126..1fb8168c6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -71,6 +71,8 @@ public enum ThreadName {
   CLUSTER_DATA_HEARTBEAT_RPC_SERVICE("ClusterDataHeartbeatRPC"),
   CLUSTER_DATA_HEARTBEAT_RPC_CLIENT("ClusterDataHeartbeatRPC-Client"),
   Cluster_Monitor("ClusterMonitor"),
+  ARCHIVING_CHECK("Archiving-Check"),
+  ARCHIVING_TASK("Archiving-Task"),
   ;
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f4cfc65920..ae52a526a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -864,6 +864,9 @@ public class IoTDBConfig {
   // The max record num returned in one schema query.
   private int schemaQueryFetchSize = 10000000;
 
+  /** number of threads given to archiving tasks */
+  private int archivingThreadNum = 2;
+
   // customizedProperties, this should be empty by default.
   private Properties customizedProperties = new Properties();
 
@@ -2750,6 +2753,14 @@ public class IoTDBConfig {
     this.schemaQueryFetchSize = schemaQueryFetchSize;
   }
 
+  public int getArchivingThreadNum() {
+    return archivingThreadNum;
+  }
+
+  public void setArchivingThreadNum(int archivingThreadNum) {
+    this.archivingThreadNum = archivingThreadNum;
+  }
+
   public double getWriteProportion() {
     return writeProportion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 1294002707..f2e9aed088 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -92,6 +92,13 @@ public class IoTDBConstant {
   public static final String COLUMN_LOCK_INFO = "lock holder";
   public static final String COLUMN_TTL = "ttl";
 
+  public static final String COLUMN_TASK_ID = "task id";
+  public static final String COLUMN_SUBMIT_TIME = "submit time";
+  public static final String COLUMN_ARCHIVING_START_TIME = "start time";
+  public static final String COLUMN_ARCHIVING_STATUS = "status";
+  public static final String COLUMN_ARCHIVING_TARGET_DIRECTORY = "target directory";
+  public static final String COLUMN_EXPIRE_TIME = "expire time(ms)";
+
   public static final String COLUMN_TASK_NAME = "task name";
   public static final String COLUMN_CREATED_TIME = "created time";
   public static final String COLUMN_PROGRESS = "progress";
@@ -154,6 +161,8 @@ public class IoTDBConstant {
   public static final String EXT_FOLDER_NAME = "ext";
   public static final String UDF_FOLDER_NAME = "udf";
   public static final String TRIGGER_FOLDER_NAME = "trigger";
+  public static final String ARCHIVING_FOLDER_NAME = "archiving";
+  public static final String ARCHIVING_LOG_FOLDER_NAME = "archiving_task";
 
   // Operation Sync folder name
   public static final String OPERATION_SYNC_FOLDER_NAME = "operationsync";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 815113f6b4..a9c3fd99d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -481,6 +481,15 @@ public class IoTDBDescriptor {
       conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
     }
 
+    conf.setArchivingThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "archiving_thread_num", Integer.toString(conf.getArchivingThreadNum()))));
+
+    if (conf.getArchivingThreadNum() <= 0) {
+      conf.setArchivingThreadNum(2);
+    }
+
     conf.setRawQueryBlockingQueueCapacity(
         Integer.parseInt(
             properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6ef91ccb28..4c2226a4ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.db.doublelive.OperationSyncLogService;
 import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
 import org.apache.iotdb.db.doublelive.OperationSyncProducer;
 import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
+import org.apache.iotdb.db.engine.archiving.ArchivingManager;
+import org.apache.iotdb.db.engine.archiving.ArchivingOperate;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -169,6 +171,8 @@ public class StorageEngine implements IService {
   ArrayList<BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
       arrayListBlockQueue;
 
+  private ArchivingManager archivingManager = ArchivingManager.getInstance();
+
   private StorageEngine() {
     if (isEnableOperationSync) {
       // Open OperationSync
@@ -587,6 +591,7 @@ public class StorageEngine implements IService {
     shutdownTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
     recoveryThreadPool.shutdownNow();
     processorMap.clear();
+    archivingManager.close();
   }
 
   private void shutdownTimedService(ScheduledExecutorService pool, String poolName) {
@@ -1322,6 +1327,31 @@ public class StorageEngine implements IService {
     }
   }
 
+  /** push the archiving info to archivingManager */
+  public void setArchiving(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+    boolean result = archivingManager.setArchiving(storageGroup, targetDir, ttl, startTime);
+    if (result) {
+      logger.info("set archiving task successfully.");
+    } else {
+      logger.info("set archiving task failed.");
+    }
+  }
+
+  public void operateArchiving(
+      ArchivingOperate.ArchivingOperateType operateType, long taskId, PartialPath storageGroup) {
+    if (taskId >= 0) {
+      archivingManager.operate(operateType, taskId);
+    } else if (storageGroup != null) {
+      archivingManager.operate(operateType, storageGroup);
+    } else {
+      logger.error("{} archiving cannot recognize taskId or storagegroup", operateType.name());
+    }
+  }
+
+  public ArchivingManager getArchivingManager() {
+    return archivingManager;
+  }
+
   static class InstanceHolder {
 
     private static final StorageEngine INSTANCE = new StorageEngine();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
new file mode 100644
index 0000000000..9ac1dc3fc9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ArchivingManager keep tracks of all Archiving Tasks, creates the threads to check/run the
+ * ArchivingTasks.
+ */
+public class ArchivingManager {
+  private static final Logger logger = LoggerFactory.getLogger(ArchivingManager.class);
+  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private final ReentrantLock lock = new ReentrantLock();
+  // region lock resources
+  private ArchivingOperateWriter logWriter;
+  // define ordering for archivingTasks, dictionary order on (status, startTime, storageGroup, -ttl)
+  private final Set<ArchivingTask> archivingTasks =
+      new TreeSet<>(
+          (task1, task2) -> {
+            int statusDiff = task1.getStatus().ordinal() - task2.getStatus().ordinal();
+            if (statusDiff != 0) return statusDiff;
+            long startTimeDiff = task1.getStartTime() - task2.getStartTime();
+            if (startTimeDiff != 0) return startTimeDiff > 0 ? 1 : -1;
+            int storageGroupDiff = task1.getStorageGroup().compareTo(task2.getStorageGroup());
+            if (storageGroupDiff != 0) return storageGroupDiff;
+            long ttlDiff = task2.getTTL() - task1.getTTL();
+            return ttlDiff > 0 ? 1 : -1;
+          });
+  // the current largest ArchivingTask id + 1, used to create new tasks
+  private long currentTaskId = 0;
+  // endregion
+
+  // single thread to iterate through archivingTasks and check start
+  private ScheduledExecutorService archivingTaskCheckThread;
+  // multiple threads to run the tasks
+  private ExecutorService archivingTaskThreadPool;
+
+  private boolean initialized = false;
+  private static final long ARCHIVING_CHECK_INTERVAL = 60 * 1000L;
+
+  private static final File LOG_FILE =
+      SystemFileFactory.INSTANCE.getFile(
+          Paths.get(
+                  FilePathUtils.regularizePath(config.getSystemDir()),
+                  IoTDBConstant.ARCHIVING_FOLDER_NAME,
+                  "log.bin")
+              .toString());
+  private static final File ARCHIVING_LOG_DIR =
+      SystemFileFactory.INSTANCE.getFile(
+          Paths.get(
+                  FilePathUtils.regularizePath(config.getSystemDir()),
+                  IoTDBConstant.ARCHIVING_FOLDER_NAME,
+                  IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+              .toString());
+
+  // singleton
+  private static class ArchivingManagerHolder {
+    private ArchivingManagerHolder() {}
+
+    private static final ArchivingManager INSTANCE = new ArchivingManager();
+  }
+
+  public static ArchivingManager getInstance() {
+    return ArchivingManagerHolder.INSTANCE;
+  }
+
+  public void init() {
+    try {
+      lock.lock();
+
+      if (initialized) {
+        return;
+      }
+
+      // create necessary log files/dirs
+      if (ARCHIVING_LOG_DIR == null) logger.error("ARCHIVING_LOG_DIR is null");
+      if (!ARCHIVING_LOG_DIR.exists()) {
+        if (ARCHIVING_LOG_DIR.mkdirs()) {
+          logger.info("ARCHIVING_LOG_DIR {} created successfully", ARCHIVING_LOG_DIR);
+        } else {
+          logger.error("ARCHIVING_LOG_DIR {} create error", ARCHIVING_LOG_DIR);
+        }
+      }
+      if (!ARCHIVING_LOG_DIR.isDirectory())
+        logger.error("{} already exists but is not directory", ARCHIVING_LOG_DIR);
+      if (!LOG_FILE.getParentFile().exists()) LOG_FILE.getParentFile().mkdirs();
+      if (!LOG_FILE.exists()) {
+        try {
+          LOG_FILE.createNewFile();
+        } catch (IOException e) {
+          logger.error("{} log file could not be created", LOG_FILE.getName());
+        }
+      }
+
+      // recover
+      ArchivingRecover recover = new ArchivingRecover();
+      recover.recover();
+      this.archivingTasks.addAll(recover.getArchivingTasks());
+      this.currentTaskId = recover.getCurrentTaskId();
+
+      try {
+        logWriter = new ArchivingOperateWriter(LOG_FILE);
+      } catch (FileNotFoundException e) {
+        logger.error("Cannot find/create log for archiving.");
+        return;
+      }
+
+      archivingTaskCheckThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.ARCHIVING_CHECK.getName());
+      archivingTaskCheckThread.scheduleAtFixedRate(
+          this::checkArchivingTasks,
+          ARCHIVING_CHECK_INTERVAL,
+          ARCHIVING_CHECK_INTERVAL,
+          TimeUnit.MILLISECONDS);
+
+      archivingTaskThreadPool =
+          IoTDBThreadPoolFactory.newFixedThreadPool(
+              config.getArchivingThreadNum(), ThreadName.ARCHIVING_TASK.getName());
+      logger.info("start archiving check thread successfully.");
+      initialized = true;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /** close all resources used */
+  public void close() {
+    initialized = false;
+    archivingTaskCheckThread.shutdown();
+    archivingTaskThreadPool.shutdown();
+
+    try {
+      logWriter.close();
+    } catch (Exception e) {
+      logger.error("Cannot close archiving log writer, because:", e);
+    }
+
+    for (ArchivingTask task : archivingTasks) {
+      task.close();
+    }
+    archivingTasks.clear();
+    currentTaskId = 0;
+  }
+
+  /** creates a copy of archivingTasks and returns */
+  public List<ArchivingTask> getArchivingTasks() {
+    try {
+      lock.lock();
+      return new ArrayList<>(archivingTasks);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * add archiving task to archivingTasks
+   *
+   * @return true if set successful, false if exists duplicates or unsuccessful
+   */
+  public boolean setArchiving(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+    try {
+      lock.lock();
+
+      // check if there are duplicates
+      for (ArchivingTask archivingTask : archivingTasks) {
+        if (archivingTask.getStorageGroup().getFullPath().equals(storageGroup.getFullPath())
+            && archivingTask.getTargetDir().equals(targetDir)
+            && archivingTask.getTTL() == ttl
+            && archivingTask.getStartTime() == startTime) {
+          logger.info("archiving task already equals archiving task {}", archivingTask.getTaskId());
+          return false;
+        }
+      }
+
+      ArchivingTask newTask =
+          new ArchivingTask(currentTaskId, storageGroup, targetDir, ttl, startTime);
+      try {
+        logWriter.log(ArchivingOperate.ArchivingOperateType.SET, newTask);
+      } catch (IOException e) {
+        logger.error("write log error");
+        return false;
+      }
+      archivingTasks.add(newTask);
+      currentTaskId++;
+      return true;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /** @return the status after operate archivingOperateType */
+  private ArchivingTask.ArchivingTaskStatus statusFromOperateType(
+      ArchivingOperate.ArchivingOperateType archivingOperateType) {
+    switch (archivingOperateType) {
+      case RESUME:
+        return ArchivingTask.ArchivingTaskStatus.READY;
+      case CANCEL:
+        return ArchivingTask.ArchivingTaskStatus.CANCELED;
+      case START:
+        return ArchivingTask.ArchivingTaskStatus.RUNNING;
+      case PAUSE:
+        return ArchivingTask.ArchivingTaskStatus.PAUSED;
+      case FINISHED:
+        return ArchivingTask.ArchivingTaskStatus.FINISHED;
+      case ERROR:
+        return ArchivingTask.ArchivingTaskStatus.ERROR;
+    }
+    return null;
+  }
+
+  /**
+   * Operate on task (pause, cancel, resume, etc)
+   *
+   * @param archivingOperateType the operator on task
+   * @param taskId taskId of ArchivingTask to operate on
+   * @return true if exists task with taskId and operate successfully
+   */
+  public boolean operate(ArchivingOperate.ArchivingOperateType archivingOperateType, long taskId) {
+    try {
+      lock.lock();
+
+      ArchivingTask task = null;
+      // find matching task
+      for (ArchivingTask archivingTask : archivingTasks) {
+        if (archivingTask.getTaskId() == taskId) {
+          task = archivingTask;
+          break;
+        }
+      }
+      if (task == null) {
+        // no matches
+        return false;
+      }
+
+      return operate(archivingOperateType, task);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Operate on task (pause, cancel, resume, etc)
+   *
+   * @param archivingOperateType the operator on task
+   * @param storageGroup StorageGroup of ArchivingTask to operate on
+   * @return true if exists task with storageGroup and operate successfully
+   */
+  public boolean operate(
+      ArchivingOperate.ArchivingOperateType archivingOperateType, PartialPath storageGroup) {
+    try {
+      lock.lock();
+
+      ArchivingTask task = null;
+      // find matching task
+      for (ArchivingTask archivingTask : archivingTasks) {
+        if (archivingTask.getStorageGroup().getFullPath().equals(storageGroup.getFullPath())) {
+          task = archivingTask;
+          break;
+        }
+      }
+      if (task == null) {
+        // no matches
+        return false;
+      }
+
+      return operate(archivingOperateType, task);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private boolean operate(
+      ArchivingOperate.ArchivingOperateType archivingOperateType, ArchivingTask task) {
+    // check if task has valid status
+    switch (archivingOperateType) {
+      case SET:
+      case START:
+      case FINISHED:
+      case ERROR:
+        return false;
+      case CANCEL:
+      case PAUSE:
+        // can cancel/pause only when status=READY/RUNNING
+        if (!(task.getStatus() == ArchivingTask.ArchivingTaskStatus.READY
+            || task.getStatus() == ArchivingTask.ArchivingTaskStatus.RUNNING)) {
+          return false;
+        }
+        break;
+      case RESUME:
+        // can resume only when status=PAUSED
+        if (!(task.getStatus() == ArchivingTask.ArchivingTaskStatus.PAUSED)) {
+          return false;
+        }
+        break;
+    }
+
+    // operate
+    switch (archivingOperateType) {
+      case PAUSE:
+      case CANCEL:
+      case RESUME:
+        // write to log
+        try {
+          logWriter.log(archivingOperateType, task);
+        } catch (IOException e) {
+          logger.error("write log error");
+          return false;
+        }
+        task.setStatus(statusFromOperateType(archivingOperateType));
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /** check if any of the archivingTasks can start */
+  public void checkArchivingTasks() {
+    try {
+      lock.lock();
+
+      logger.info("checking archivingTasks");
+      for (ArchivingTask task : archivingTasks) {
+
+        if (task.getStartTime() - DatetimeUtils.currentTime() <= 0
+            && task.getStatus() == ArchivingTask.ArchivingTaskStatus.READY) {
+
+          // storage group has no data
+          if (!StorageEngine.getInstance().getProcessorMap().containsKey(task.getStorageGroup())) {
+            return;
+          }
+
+          // set task to running
+          task.setStatus(ArchivingTask.ArchivingTaskStatus.RUNNING);
+
+          // push check archivingTask to storageGroupManager, use Runnable to give task to thread
+          // pool
+          archivingTaskThreadPool.execute(
+              () -> {
+                try {
+                  logWriter.log(ArchivingOperate.ArchivingOperateType.START, task);
+                  task.startTask();
+                } catch (IOException e) {
+                  logger.error("write log error");
+                  task.setStatus(ArchivingTask.ArchivingTaskStatus.ERROR);
+                  return;
+                }
+
+                StorageEngine.getInstance()
+                    .getProcessorMap()
+                    .get(task.getStorageGroup())
+                    .checkArchivingTask(task);
+                logger.info("check archiving task successfully.");
+
+                // set state and remove
+                try {
+                  logWriter.log(ArchivingOperate.ArchivingOperateType.FINISHED, task);
+                  task.finish();
+                } catch (IOException e) {
+                  logger.error("write log error");
+                  task.setStatus(ArchivingTask.ArchivingTaskStatus.ERROR);
+                  return;
+                }
+                task.setStatus(ArchivingTask.ArchivingTaskStatus.FINISHED);
+              });
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  // test
+  public void setCheckThreadTime(long checkThreadTime) {
+    archivingTaskCheckThread.shutdown();
+
+    archivingTaskCheckThread =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.ARCHIVING_CHECK.getName());
+    archivingTaskCheckThread.scheduleAtFixedRate(
+        this::checkArchivingTasks, checkThreadTime, checkThreadTime, TimeUnit.MILLISECONDS);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperate.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperate.java
new file mode 100644
index 0000000000..cbe91e82e4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperate.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** ArchivingOperate is ArchivingOperateType (SET, CANCEL, PAUSE, etc) and an ArchivingTask */
+public class ArchivingOperate {
+  private ArchivingOperateType type;
+  private ArchivingTask task;
+
+  public ArchivingOperate(ArchivingOperateType type, ArchivingTask task) {
+    this.type = type;
+    this.task = new ArchivingTask(task);
+  }
+
+  public ArchivingOperate(ArchivingOperateType type, long taskId) {
+    this.type = type;
+    this.task = new ArchivingTask(taskId);
+  }
+
+  public ArchivingOperateType getType() {
+    return type;
+  }
+
+  public ArchivingTask getTask() {
+    return task;
+  }
+
+  public void serialize(FileOutputStream logFileOutStream) throws IOException {
+    int typeNum = type.ordinal();
+    ReadWriteIOUtils.write((byte) typeNum, logFileOutStream);
+    ReadWriteIOUtils.write(task.getTaskId(), logFileOutStream);
+
+    if (type == ArchivingOperateType.SET) {
+      ReadWriteIOUtils.write(task.getStorageGroup().getFullPath(), logFileOutStream);
+      ReadWriteIOUtils.write(task.getTargetDir().getPath(), logFileOutStream);
+      ReadWriteIOUtils.write(task.getStartTime(), logFileOutStream);
+      ReadWriteIOUtils.write(task.getTTL(), logFileOutStream);
+      ReadWriteIOUtils.write(task.getSubmitTime(), logFileOutStream);
+    }
+
+    logFileOutStream.flush();
+  }
+
+  public static ArchivingOperate deserialize(FileInputStream logFileInStream)
+      throws IOException, IllegalPathException {
+    ArchivingOperateType operateType;
+
+    int typeNum = ReadWriteIOUtils.readByte(logFileInStream);
+    if (typeNum >= 0 && typeNum < ArchivingOperateType.values().length)
+      operateType = ArchivingOperateType.values()[typeNum];
+    else throw new IOException();
+
+    long taskId = ReadWriteIOUtils.readLong(logFileInStream);
+
+    ArchivingTask deserializedTask;
+    if (operateType == ArchivingOperateType.SET) {
+      PartialPath storageGroup = new PartialPath(ReadWriteIOUtils.readString(logFileInStream));
+      String targetDirPath = ReadWriteIOUtils.readString(logFileInStream);
+      File targetDir = FSFactoryProducer.getFSFactory().getFile(targetDirPath);
+      long startTime = ReadWriteIOUtils.readLong(logFileInStream);
+      long ttl = ReadWriteIOUtils.readLong(logFileInStream);
+      long submitTime = ReadWriteIOUtils.readLong(logFileInStream);
+
+      deserializedTask =
+          new ArchivingTask(taskId, storageGroup, targetDir, ttl, startTime, submitTime);
+    } else {
+      deserializedTask = new ArchivingTask(taskId);
+    }
+    return new ArchivingOperate(operateType, deserializedTask);
+  }
+
+  public enum ArchivingOperateType {
+    SET,
+    CANCEL,
+    START,
+    PAUSE,
+    RESUME,
+    FINISHED,
+    ERROR
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateReader.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateReader.java
new file mode 100644
index 0000000000..afa122627e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+/**
+ * ArchivingOperateReader reads binarized ArchivingOperate from file using FileInputStream from head
+ * to tail.
+ */
+public class ArchivingOperateReader implements AutoCloseable {
+  private static final Logger logger = LoggerFactory.getLogger(ArchivingOperateReader.class);
+  private final File logFile;
+  private FileInputStream logFileInStream;
+  private ArchivingOperate operate;
+  private long unbrokenLogsSize = 0;
+
+  public ArchivingOperateReader(String logFilePath) throws IOException {
+    this.logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    logFileInStream = new FileInputStream(logFile);
+  }
+
+  public ArchivingOperateReader(File logFile) throws IOException {
+    this.logFile = logFile;
+    logFileInStream = new FileInputStream(logFile);
+  }
+
+  /** @return ArchivingOperate parsed from log file, null if nothing left in file */
+  private ArchivingOperate readOperate() {
+    try {
+      ArchivingOperate log = ArchivingOperate.deserialize(logFileInStream);
+
+      unbrokenLogsSize = logFileInStream.getChannel().position();
+      return log;
+    } catch (IllegalPathException | IOException e) {
+      return null;
+    }
+  }
+
+  public ArchivingOperate next() {
+    ArchivingOperate ret = operate;
+    operate = null;
+    return ret;
+  }
+
+  public boolean hasNext() {
+    if (operate != null) {
+      return true;
+    }
+
+    // try reading
+    operate = readOperate();
+
+    if (operate == null) {
+      truncateBrokenLogs();
+      operate = null;
+      return false;
+    }
+    return true;
+  }
+
+  /** Keeps 0...unbrokenLogSize bytes of the Log File and discards the rest */
+  private void truncateBrokenLogs() {
+    try (FileOutputStream outputStream = new FileOutputStream(logFile, true);
+        FileChannel channel = outputStream.getChannel()) {
+      channel.truncate(unbrokenLogsSize);
+    } catch (IOException e) {
+      logger.error("Fail to truncate log file to size {}", unbrokenLogsSize, e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      logFileInStream.close();
+    } catch (IOException e) {
+      logger.error("Failed to close archiving log");
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriter.java
new file mode 100644
index 0000000000..01d4ccd60b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.logfile.MLogTxtWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * ArchivingOperateWriter writes the binary logs of ArchivingOperate into file using
+ * FileOutputStream
+ */
+public class ArchivingOperateWriter implements AutoCloseable {
+  private static final Logger logger = LoggerFactory.getLogger(MLogTxtWriter.class);
+  private final File logFile;
+  private FileOutputStream logFileOutStream;
+
+  public ArchivingOperateWriter(String logFileName) throws FileNotFoundException {
+    this(SystemFileFactory.INSTANCE.getFile(logFileName));
+  }
+
+  public ArchivingOperateWriter(File logFile) throws FileNotFoundException {
+    this.logFile = logFile;
+    if (!logFile.exists()) {
+      if (logFile.getParentFile() != null) {
+        if (logFile.getParentFile().mkdirs()) {
+          logger.info("created archiving log folder");
+        } else {
+          logger.info("create archiving log folder failed");
+        }
+      }
+    }
+    logFileOutStream = new FileOutputStream(logFile, true);
+  }
+
+  public void log(ArchivingOperate.ArchivingOperateType type, ArchivingTask task)
+      throws IOException {
+    ArchivingOperate operate;
+    switch (type) {
+      case SET:
+        operate = new ArchivingOperate(ArchivingOperate.ArchivingOperateType.SET, task);
+        operate.serialize(logFileOutStream);
+        break;
+      case CANCEL:
+      case START:
+      case PAUSE:
+      case RESUME:
+      case FINISHED:
+      case ERROR:
+        operate = new ArchivingOperate(type, task.getTaskId());
+        operate.serialize(logFileOutStream);
+        break;
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    logFileOutStream.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingRecover.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingRecover.java
new file mode 100644
index 0000000000..5391e4e7cf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingRecover.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * ArchivingRecover class encapsulates the recover logic, it retrieves the ArchivingTasks and
+ * finishes archiving the tsfiles.
+ */
+public class ArchivingRecover {
+  private static final Logger logger = LoggerFactory.getLogger(ArchivingRecover.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final File LOG_FILE =
+      SystemFileFactory.INSTANCE.getFile(
+          Paths.get(
+                  FilePathUtils.regularizePath(config.getSystemDir()),
+                  IoTDBConstant.ARCHIVING_FOLDER_NAME,
+                  "log.bin")
+              .toString());
+  private static final File ARCHIVING_LOG_DIR =
+      SystemFileFactory.INSTANCE.getFile(
+          Paths.get(
+                  FilePathUtils.regularizePath(config.getSystemDir()),
+                  IoTDBConstant.ARCHIVING_FOLDER_NAME,
+                  IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+              .toString());
+  private Map<Long, ArchivingTask> archivingTasks;
+  private long currentTaskId = 0;
+
+  public List<ArchivingTask> recover() {
+    // first recover archiving tsfiles
+    recoverArchivingFiles();
+
+    archivingTasks = new HashMap<>();
+    currentTaskId = 0;
+
+    // read from logReader
+    try (ArchivingOperateReader logReader = new ArchivingOperateReader(LOG_FILE);
+        ArchivingOperateWriter logWriter = new ArchivingOperateWriter(LOG_FILE)) {
+      Set<Long> errorSet = new HashSet<>();
+
+      while (logReader.hasNext()) {
+        ArchivingOperate operate = logReader.next();
+        long taskId = operate.getTask().getTaskId();
+
+        switch (operate.getType()) {
+          case SET:
+            setArchivingFromLog(operate.getTask());
+            break;
+          case CANCEL:
+            operateFromLog(ArchivingOperate.ArchivingOperateType.CANCEL, taskId);
+            break;
+          case START:
+            // if task started but didn't finish, then error occurred
+            errorSet.add(taskId);
+            break;
+          case PAUSE:
+            errorSet.remove(taskId);
+            operateFromLog(ArchivingOperate.ArchivingOperateType.PAUSE, taskId);
+            break;
+          case RESUME:
+            operateFromLog(ArchivingOperate.ArchivingOperateType.RESUME, taskId);
+            break;
+          case FINISHED:
+            // finished task => remove from list and remove from potential error task
+            errorSet.remove(taskId);
+            archivingTasks.remove(taskId);
+            operateFromLog(ArchivingOperate.ArchivingOperateType.FINISHED, taskId);
+            break;
+          case ERROR:
+            // already put error in log
+            errorSet.remove(taskId);
+            operateFromLog(ArchivingOperate.ArchivingOperateType.ERROR, taskId);
+            break;
+          default:
+            logger.error("read archiving log: unknown type");
+        }
+      }
+
+      // for each task in errorSet, the task started but didn't finish (an error)
+      for (long errTaskId : errorSet) {
+        if (archivingTasks.containsKey(errTaskId)) {
+          // write to log and set task in ERROR in memory
+          logWriter.log(ArchivingOperate.ArchivingOperateType.ERROR, archivingTasks.get(errTaskId));
+          operateFromLog(ArchivingOperate.ArchivingOperateType.ERROR, errTaskId);
+        } else {
+          logger.error("unknown error taskId");
+        }
+      }
+    } catch (Exception e) {
+      logger.error("Cannot read log for archiving.");
+    }
+
+    recoverArchivingFiles();
+
+    return new ArrayList<>(archivingTasks.values());
+  }
+
+  /** add archiving task to archivingTasks from log, does not write to log */
+  public void setArchivingFromLog(ArchivingTask newTask) {
+    if (currentTaskId > newTask.getTaskId()) {
+      logger.error("set archiving error, current index larger than log index");
+    }
+
+    archivingTasks.put(newTask.getTaskId(), newTask);
+    currentTaskId = newTask.getTaskId() + 1;
+  }
+
+  private ArchivingTask.ArchivingTaskStatus statusFromOperateType(
+      ArchivingOperate.ArchivingOperateType archivingOperateType) {
+    switch (archivingOperateType) {
+      case RESUME:
+        return ArchivingTask.ArchivingTaskStatus.READY;
+      case CANCEL:
+        return ArchivingTask.ArchivingTaskStatus.CANCELED;
+      case START:
+        return ArchivingTask.ArchivingTaskStatus.RUNNING;
+      case PAUSE:
+        return ArchivingTask.ArchivingTaskStatus.PAUSED;
+      case FINISHED:
+        return ArchivingTask.ArchivingTaskStatus.FINISHED;
+      case ERROR:
+        return ArchivingTask.ArchivingTaskStatus.ERROR;
+    }
+    return null;
+  }
+
+  /** operate Archiving to archivingTasks from log, does not write to log */
+  public boolean operateFromLog(ArchivingOperate.ArchivingOperateType operateType, long taskId) {
+    if (archivingTasks.containsKey(taskId)) {
+      archivingTasks.get(taskId).setStatus(statusFromOperateType(operateType));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /** finish the unfinished ArchivingTask using log files under ARCHIVING_LOG_DIR */
+  public void recoverArchivingFiles() {
+    File[] archivingLogFiles = ARCHIVING_LOG_DIR.listFiles();
+    for (File logFile : archivingLogFiles) {
+      FileInputStream logFileInput;
+      File targetDir;
+      String tsfilePath;
+      File tsfile;
+
+      try {
+        logFileInput = new FileInputStream(logFile);
+        String targetDirPath = ReadWriteIOUtils.readString(logFileInput);
+
+        targetDir = SystemFileFactory.INSTANCE.getFile(targetDirPath);
+        tsfilePath = ReadWriteIOUtils.readString(logFileInput);
+
+        if (targetDir.exists()) {
+          if (!targetDir.isDirectory()) {
+            logger.error("target dir {} not a directory", targetDirPath);
+            continue;
+          }
+        } else if (!targetDir.mkdirs()) {
+          logger.error("create target dir {} failed", targetDirPath);
+          continue;
+        }
+      } catch (IOException e) {
+        // could not read log file, continue to next log
+        logger.error("ArchivingRecover: log file not found");
+        continue;
+      }
+
+      while (tsfilePath != null && !tsfilePath.isEmpty()) {
+        tsfile = SystemFileFactory.INSTANCE.getFile(tsfilePath);
+
+        TsFileResource resource = new TsFileResource(tsfile);
+        resource.archive(targetDir);
+
+        try {
+          tsfilePath = ReadWriteIOUtils.readString(logFileInput);
+        } catch (IOException e) {
+          // finished reading all tsfile paths
+          break;
+        }
+      }
+
+      try {
+        logFileInput.close();
+      } catch (IOException e) {
+        logger.error("Archiving Log File Error", e);
+        break;
+      }
+
+      logFile.delete();
+    }
+  }
+
+  public List<ArchivingTask> getArchivingTasks() {
+    return new ArrayList<>(archivingTasks.values());
+  }
+
+  public long getCurrentTaskId() {
+    return currentTaskId;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
new file mode 100644
index 0000000000..4973e3bbb1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+
+/** Class for each Archiving Task */
+public class ArchivingTask {
+  private long taskId;
+  private PartialPath storageGroup;
+  private File targetDir;
+  private long startTime;
+  private long ttl;
+  private volatile ArchivingTaskStatus status = ArchivingTaskStatus.READY;
+  private long submitTime;
+
+  private static final Logger logger = LoggerFactory.getLogger(ArchivingTask.class);
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  FileOutputStream logFileOutput = null;
+  private static final File ARCHIVING_LOG_DIR =
+      SystemFileFactory.INSTANCE.getFile(
+          Paths.get(
+                  FilePathUtils.regularizePath(config.getSystemDir()),
+                  IoTDBConstant.ARCHIVING_FOLDER_NAME,
+                  IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+              .toString());
+
+  public ArchivingTask(long taskId) {
+    this.taskId = taskId;
+  }
+
+  public ArchivingTask(ArchivingTask task) {
+    this.taskId = task.getTaskId();
+    this.storageGroup = task.getStorageGroup();
+    this.targetDir = task.getTargetDir();
+    this.ttl = task.getTTL();
+    this.startTime = task.getStartTime();
+    this.submitTime = task.getSubmitTime();
+  }
+
+  public ArchivingTask(
+      long taskId, PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+    this.taskId = taskId;
+    this.storageGroup = storageGroup;
+    this.targetDir = targetDir;
+    this.ttl = ttl;
+    this.startTime = startTime;
+    this.submitTime = DatetimeUtils.currentTime();
+  }
+
+  public ArchivingTask(
+      long taskId,
+      PartialPath storageGroup,
+      File targetDir,
+      long ttl,
+      long startTime,
+      long submitTime) {
+    this.taskId = taskId;
+    this.storageGroup = storageGroup;
+    this.targetDir = targetDir;
+    this.ttl = ttl;
+    this.startTime = startTime;
+    this.submitTime = submitTime;
+  }
+
+  /**
+   * started the archiving task, write to log
+   *
+   * @return true if write log successful, false otherwise
+   */
+  public boolean startTask() throws IOException {
+    File logFile = SystemFileFactory.INSTANCE.getFile(ARCHIVING_LOG_DIR, taskId + ".log");
+    if (logFile.exists()) {
+      // want an empty log file
+      logFile.delete();
+    }
+    if (!logFile.createNewFile()) {
+      // log file doesn't exist but cannot be created
+      return false;
+    }
+
+    logFileOutput = new FileOutputStream(logFile);
+
+    ReadWriteIOUtils.write(targetDir.getAbsolutePath(), logFileOutput);
+    logFileOutput.flush();
+
+    return true;
+  }
+
+  /**
+   * started archiving tsfile and its resource/mod files
+   *
+   * @return true if write log successful, false otherwise
+   */
+  public boolean startFile(File tsfile) throws IOException {
+    if (logFileOutput == null) {
+      logger.error("need to run ArchivingTask.startTask before ArchivingTask.start");
+      return false;
+    }
+
+    ReadWriteIOUtils.write(tsfile.getAbsolutePath(), logFileOutput);
+    logFileOutput.flush();
+
+    return true;
+  }
+
+  /** finished archiving task, deletes logs and closes FileOutputStream */
+  public void finish() {
+    File logFile = SystemFileFactory.INSTANCE.getFile(ARCHIVING_LOG_DIR, taskId + ".log");
+    if (logFile.exists()) {
+      logFile.delete();
+    }
+    this.close();
+  }
+
+  /** release all resources */
+  public void close() {
+    try {
+      if (logFileOutput != null) {
+        logFileOutput.close();
+        logFileOutput = null;
+      }
+    } catch (IOException e) {
+      logger.error("could not close fileoutputstream for task {}", taskId);
+    }
+  }
+
+  // getter and setter functions
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public PartialPath getStorageGroup() {
+    return storageGroup;
+  }
+
+  public File getTargetDir() {
+    return targetDir;
+  }
+
+  public long getTTL() {
+    return ttl;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public ArchivingTaskStatus getStatus() {
+    return status;
+  }
+
+  public long getSubmitTime() {
+    return submitTime;
+  }
+
+  public void setStatus(ArchivingTaskStatus status) {
+    this.status = status;
+  }
+
+  public enum ArchivingTaskStatus {
+    READY,
+    RUNNING,
+    PAUSED,
+    CANCELED,
+    ERROR,
+    FINISHED
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index bce000b73d..6a991d2446 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -535,6 +535,35 @@ public class TsFileResource {
     return true;
   }
 
+  /**
+   * Move its data file, resource file, and modification file physically.
+   *
+   * @return moved data file
+   */
+  public File archive(File targetDir) {
+    // get the resource and mod files
+    File resourceFile = fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX);
+    File modFile = fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX);
+
+    // get the target file locations
+    File archivedFile = fsFactory.getFile(targetDir, file.getName());
+    File archivedResourceFile = fsFactory.getFile(targetDir, file.getName() + RESOURCE_SUFFIX);
+    File archivedModificationFile =
+        fsFactory.getFile(targetDir, file.getName() + ModificationFile.FILE_SUFFIX);
+
+    // move
+    if (file.exists()) {
+      fsFactory.moveFile(file, archivedFile);
+    }
+    if (resourceFile.exists()) {
+      fsFactory.moveFile(resourceFile, archivedResourceFile);
+    }
+    if (modFile.exists()) {
+      fsFactory.moveFile(modFile, archivedModificationFile);
+    }
+    return archivedFile;
+  }
+
   void moveTo(File targetDir) {
     fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName()));
     fsFactory.moveFile(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
index 42eaf481d3..3c7a0d960d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
@@ -23,5 +23,6 @@ public enum TsFileResourceStatus {
   CLOSED,
   COMPACTION_CANDIDATE,
   COMPACTING,
-  DELETED
+  DELETED,
+  ARCHIVED
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index e0bac29972..b67bcae731 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.SystemStatus;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingTask;
 import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverManager;
@@ -1550,6 +1551,80 @@ public class VirtualStorageGroupProcessor {
     }
   }
 
+  /** iterate over TsFiles and move to targetDir if out of ttl */
+  public void checkArchivingTask(ArchivingTask task) {
+    if (task.getTTL() == Long.MAX_VALUE) {
+      logger.debug(
+          "{}: Archiving ttl not set, ignore the check",
+          logicalStorageGroupName + "-" + virtualStorageGroupId);
+      return;
+    }
+    long ttlLowerBound = System.currentTimeMillis() - task.getTTL();
+    logger.debug(
+        "{}: Archiving files before {}",
+        logicalStorageGroupName + "-" + virtualStorageGroupId,
+        new Date(ttlLowerBound));
+
+    // copy to avoid concurrent modification of deletion
+    List<TsFileResource> seqFiles = new ArrayList<>(tsFileManager.getTsFileList(true));
+    List<TsFileResource> unseqFiles = new ArrayList<>(tsFileManager.getTsFileList(false));
+
+    for (TsFileResource tsFileResource : seqFiles) {
+      if (task.getStatus() != ArchivingTask.ArchivingTaskStatus.RUNNING) {
+        // task stopped running (eg. the task is paused), return
+        return;
+      }
+      checkArchivingTaskFile(task, tsFileResource, task.getTargetDir(), ttlLowerBound, true);
+    }
+
+    for (TsFileResource tsFileResource : unseqFiles) {
+      if (task.getStatus() != ArchivingTask.ArchivingTaskStatus.RUNNING) {
+        // task stopped running, return
+        return;
+      }
+      checkArchivingTaskFile(task, tsFileResource, task.getTargetDir(), ttlLowerBound, false);
+    }
+  }
+
+  /** archive the file to targetDir */
+  public void checkArchivingTaskFile(
+      ArchivingTask task,
+      TsFileResource resource,
+      File targetDir,
+      long ttlLowerBound,
+      boolean isSeq) {
+    writeLock("checkArchivingLock");
+    try {
+      if (!resource.isClosed() || !resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
+        return;
+      }
+
+      resource.setStatus(TsFileResourceStatus.ARCHIVED);
+
+      // ensure that the file is not used by any queries
+      if (resource.tryWriteLock()) {
+        try {
+          // try to archive physical data file
+          tsFileManager.remove(resource, isSeq);
+
+          // start archiving file
+          if (task.startFile(resource.getTsFile())) {
+            File archivedFile = resource.archive(targetDir);
+          } else {
+            // archive file couldn't start
+            logger.error("{} archiving logger error", resource.getTsFilePath());
+          }
+        } catch (IOException e) {
+          logger.error("{} archiving error", resource.getTsFilePath());
+        } finally {
+          resource.writeUnlock();
+        }
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   public void timedFlushUnseqMemTable() {
     writeLock("timedFlushUnseqMemTable");
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index fb864ac89f..1d41f41fba 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.storagegroup.virtualSg;
 
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
@@ -119,6 +120,21 @@ public class StorageGroupManager {
     }
   }
 
+  /** push check archiving to all virtual storage group processors */
+  public void checkArchivingTask(ArchivingTask task) {
+    for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+        this.virtualStorageGroupProcessor) {
+      if (task.getStatus() != ArchivingTask.ArchivingTaskStatus.RUNNING) {
+        // task stopped running (eg. the task is paused), return
+        return;
+      }
+
+      if (virtualStorageGroupProcessor != null) {
+        virtualStorageGroupProcessor.checkArchivingTask(task);
+      }
+    }
+  }
+
   /** push check sequence memtable flush interval down to all sg */
   public void timedFlushSeqMemTable() {
     for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 8c26ec903c..348773cec9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.SystemStatus;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingOperate;
+import org.apache.iotdb.db.engine.archiving.ArchivingTask;
 import org.apache.iotdb.db.engine.cache.BloomFilterCache;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
@@ -107,12 +109,15 @@ import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
 import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
 import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SettlePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowArchivingPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
@@ -128,6 +133,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -179,6 +185,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -193,6 +200,9 @@ import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ARCHIVING_START_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ARCHIVING_STATUS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ARCHIVING_TARGET_DIRECTORY;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
@@ -204,6 +214,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUE
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_EXPIRE_TIME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
@@ -213,6 +224,8 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_SCHEMA_TEMPLATE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_SUBMIT_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_ID;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
@@ -408,6 +421,12 @@ public class PlanExecutor implements IPlanExecutor {
         return true;
       case SHOW_QUERY_RESOURCE:
         return processShowQueryResource();
+      case SET_ARCHIVING:
+        operateSetArchiving((SetArchivingPlan) plan);
+        return true;
+      case PAUSE_ARCHIVING:
+        operatePauseArchiving((PauseArchivingPlan) plan);
+        return true;
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", plan.getOperatorName()));
@@ -760,6 +779,8 @@ public class PlanExecutor implements IPlanExecutor {
         return processShowPathsSetSchemaTemplate((ShowPathsSetTemplatePlan) showPlan);
       case PATHS_USING_SCHEMA_TEMPLATE:
         return processShowPathsUsingSchemaTemplate((ShowPathsUsingTemplatePlan) showPlan);
+      case SHOW_ARCHIVING:
+        return processShowArchiving((ShowArchivingPlan) showPlan);
       default:
         throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
     }
@@ -1261,6 +1282,83 @@ public class PlanExecutor implements IPlanExecutor {
     return TriggerRegistrationService.getInstance().show();
   }
 
+  private QueryDataSet processShowArchiving(ShowArchivingPlan showArchivingPlan) {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_TASK_ID, false),
+                new PartialPath(COLUMN_SUBMIT_TIME, false),
+                new PartialPath(COLUMN_STORAGE_GROUP, false),
+                new PartialPath(COLUMN_ARCHIVING_STATUS, false),
+                new PartialPath(COLUMN_ARCHIVING_START_TIME, false),
+                new PartialPath(COLUMN_EXPIRE_TIME, false),
+                new PartialPath(COLUMN_ARCHIVING_TARGET_DIRECTORY, false)),
+            Arrays.asList(
+                TSDataType.INT64,
+                TSDataType.TEXT,
+                TSDataType.TEXT,
+                TSDataType.TEXT,
+                TSDataType.TEXT,
+                TSDataType.INT64,
+                TSDataType.TEXT));
+    Set<PartialPath> selectedSgs = new HashSet<>(showArchivingPlan.getStorageGroups());
+
+    List<ArchivingTask> archivingTaskList =
+        StorageEngine.getInstance().getArchivingManager().getArchivingTasks();
+    int timestamp = 0;
+    for (ArchivingTask task : archivingTaskList) {
+      PartialPath sgName = task.getStorageGroup();
+      if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
+        continue;
+      }
+      RowRecord rowRecord = new RowRecord(timestamp++);
+      Field taskId = new Field(TSDataType.INT64);
+      Field submitTime = new Field(TSDataType.TEXT);
+      Field sg = new Field(TSDataType.TEXT);
+      Field status = new Field(TSDataType.TEXT);
+      Field startTime;
+      Field ttl;
+      Field targetDir = new Field(TSDataType.TEXT);
+
+      // set values for Fields based on tasks
+      taskId.setLongV(task.getTaskId());
+      sg.setBinaryV(new Binary(sgName.getFullPath()));
+      status.setBinaryV(new Binary(task.getStatus().name()));
+      targetDir.setBinaryV(new Binary(task.getTargetDir().getPath()));
+      if (task.getTTL() != Long.MAX_VALUE) {
+        ttl = new Field(TSDataType.INT64);
+        ttl.setLongV(task.getTTL());
+      } else {
+        ttl = null;
+      }
+      ZonedDateTime submitDateTime =
+          DatetimeUtils.convertMillsecondToZonedDateTime(task.getSubmitTime());
+      String submitTimeStr = DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(submitDateTime);
+      submitTime.setBinaryV(new Binary(submitTimeStr));
+      if (task.getStartTime() != Long.MAX_VALUE) {
+        ZonedDateTime startDate =
+            DatetimeUtils.convertMillsecondToZonedDateTime(task.getStartTime());
+        String startTimeStr = DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(startDate);
+        startTime = new Field(TSDataType.TEXT);
+        startTime.setBinaryV(new Binary(startTimeStr));
+      } else {
+        startTime = null;
+      }
+
+      // add to rowRecord
+      rowRecord.addField(taskId);
+      rowRecord.addField(submitTime);
+      rowRecord.addField(sg);
+      rowRecord.addField(status);
+      rowRecord.addField(startTime);
+      rowRecord.addField(ttl);
+      rowRecord.addField(targetDir);
+      listDataSet.putRecord(rowRecord);
+    }
+
+    return listDataSet;
+  }
+
   private void addRowRecordForShowQuery(
       ListDataSet listDataSet, int timestamp, String item, String value) {
     RowRecord rowRecord = new RowRecord(timestamp);
@@ -1598,6 +1696,44 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private void operateSetArchiving(SetArchivingPlan plan) throws QueryProcessException {
+    if (plan.getTargetDir() == null) {
+      // is cancel plan
+      StorageEngine.getInstance()
+          .operateArchiving(
+              ArchivingOperate.ArchivingOperateType.CANCEL,
+              plan.getTaskId(),
+              plan.getStorageGroup());
+    } else {
+      try {
+        List<PartialPath> storageGroupPaths =
+            IoTDB.metaManager.getMatchedStorageGroups(plan.getStorageGroup(), plan.isPrefixMatch());
+        for (PartialPath storagePath : storageGroupPaths) {
+          StorageEngine.getInstance()
+              .setArchiving(storagePath, plan.getTargetDir(), plan.getTTL(), plan.getStartTime());
+        }
+      } catch (MetadataException e) {
+        throw new QueryProcessException(e);
+      }
+    }
+  }
+
+  private void operatePauseArchiving(PauseArchivingPlan plan) {
+    if (plan.isPause()) {
+      StorageEngine.getInstance()
+          .operateArchiving(
+              ArchivingOperate.ArchivingOperateType.PAUSE,
+              plan.getTaskId(),
+              plan.getStorageGroup());
+    } else {
+      StorageEngine.getInstance()
+          .operateArchiving(
+              ArchivingOperate.ArchivingOperateType.RESUME,
+              plan.getTaskId(),
+              plan.getStorageGroup());
+    }
+  }
+
   @Override
   public void update(PartialPath path, long startTime, long endTime, String value) {
     throw new UnsupportedOperationException("update is not supported now");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 677eadcb51..5708665b83 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -196,6 +196,10 @@ public abstract class Operator {
 
     SHOW_QUERY_RESOURCE,
 
-    DEACTIVATE_TEMPLATE
+    DEACTIVATE_TEMPLATE,
+
+    SET_ARCHIVING,
+    PAUSE_ARCHIVING,
+    SHOW_ARCHIVING
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CancelArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CancelArchivingOperator.java
new file mode 100644
index 0000000000..fde4e7bc39
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CancelArchivingOperator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class CancelArchivingOperator extends Operator {
+  private PartialPath storageGroup = null;
+
+  private long taskId = -1;
+
+  public CancelArchivingOperator(int tokenIntType) {
+    super(tokenIntType);
+    this.operatorType = OperatorType.SET_ARCHIVING;
+  }
+
+  public PartialPath getStorageGroup() {
+    return storageGroup;
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) {
+    this.storageGroup = storageGroup;
+  }
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(long taskId) {
+    this.taskId = taskId;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    if (storageGroup != null) {
+      return new SetArchivingPlan(storageGroup);
+    } else if (taskId != -1) {
+      return new SetArchivingPlan(taskId);
+    } else {
+      return new SetArchivingPlan();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/PauseArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/PauseArchivingOperator.java
new file mode 100644
index 0000000000..a55d010e28
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/PauseArchivingOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class PauseArchivingOperator extends Operator {
+  private long taskId = -1;
+  private PartialPath storageGroup;
+
+  public PauseArchivingOperator(int tokenIntType) {
+    super(tokenIntType);
+    this.operatorType = OperatorType.PAUSE_ARCHIVING;
+  }
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public PartialPath getStorageGroup() {
+    return storageGroup;
+  }
+
+  public void setTaskId(long taskId) {
+    this.taskId = taskId;
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) {
+    this.storageGroup = storageGroup;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    if (storageGroup != null) {
+      return new PauseArchivingPlan(storageGroup, true);
+    } else if (taskId != -1) {
+      return new PauseArchivingPlan(taskId, true);
+    } else {
+      return new PauseArchivingPlan(true);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ResumeArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ResumeArchivingOperator.java
new file mode 100644
index 0000000000..9bf53bf237
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ResumeArchivingOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class ResumeArchivingOperator extends Operator {
+  private long taskId = -1;
+  private PartialPath storageGroup;
+
+  public ResumeArchivingOperator(int tokenIntType) {
+    super(tokenIntType);
+    this.operatorType = OperatorType.PAUSE_ARCHIVING;
+  }
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public PartialPath getStorageGroup() {
+    return storageGroup;
+  }
+
+  public void setTaskId(long taskId) {
+    this.taskId = taskId;
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) {
+    this.storageGroup = storageGroup;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    if (storageGroup != null) {
+      return new PauseArchivingPlan(storageGroup, false);
+    } else if (taskId != -1) {
+      return new PauseArchivingPlan(taskId, false);
+    } else {
+      return new PauseArchivingPlan(false);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetArchivingOperator.java
new file mode 100644
index 0000000000..0a2b482037
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetArchivingOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+import java.io.File;
+
+public class SetArchivingOperator extends Operator {
+  private PartialPath storageGroup = null;
+  private File targetDir = null;
+  private Long ttl = null;
+  private Long startTime = null;
+
+  public SetArchivingOperator(int tokenIntType) {
+    super(tokenIntType);
+    this.operatorType = OperatorType.SET_ARCHIVING;
+  }
+
+  public PartialPath getStorageGroup() {
+    return storageGroup;
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) {
+    this.storageGroup = storageGroup;
+  }
+
+  public File getTargetDir() {
+    return targetDir;
+  }
+
+  public void setTargetDir(File targetDir) {
+    this.targetDir = targetDir;
+  }
+
+  public long getTTL() {
+    return ttl;
+  }
+
+  public void setTTL(long ttl) {
+    this.ttl = ttl;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    if (storageGroup == null) {
+      throw new SQLParserException("storage_group not specified");
+    }
+    if (startTime == null) {
+      throw new SQLParserException("start_time not specified");
+    }
+    if (ttl == null) {
+      throw new SQLParserException("ttl not specified");
+    }
+    if (targetDir == null) {
+      throw new SQLParserException("target_dir not specified");
+    }
+
+    return new SetArchivingPlan(storageGroup, targetDir, ttl, startTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
new file mode 100644
index 0000000000..575be7c063
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+import java.util.List;
+
+public class ShowArchivingOperator extends ShowOperator {
+  private List<PartialPath> storageGroups;
+
+  public ShowArchivingOperator(List<PartialPath> storageGroups) {
+    super(SQLConstant.TOK_SHOW, OperatorType.SHOW_ARCHIVING);
+    this.storageGroups = storageGroups;
+  }
+
+  public List<PartialPath> getStorageGroups() {
+    return storageGroups;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator) {
+    return new ShowArchivingPlan(storageGroups);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 2857c45752..443561c703 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -562,7 +562,8 @@ public abstract class PhysicalPlan {
     APPEND_TEMPLATE,
     PRUNE_TEMPLATE,
     DROP_TEMPLATE,
-    DEACTIVATE_TEMPLATE
+    DEACTIVATE_TEMPLATE,
+    ARCHIVING
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PauseArchivingPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PauseArchivingPlan.java
new file mode 100644
index 0000000000..cdbc63ca7a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PauseArchivingPlan.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PauseArchivingPlan extends PhysicalPlan {
+  private long taskId = -1;
+  private PartialPath storageGroup;
+  private boolean pause = true;
+
+  public PauseArchivingPlan(boolean pause) {
+    super(Operator.OperatorType.PAUSE_ARCHIVING);
+    this.pause = pause;
+  }
+
+  public PauseArchivingPlan(PartialPath storageGroup, boolean pause) {
+    super(Operator.OperatorType.PAUSE_ARCHIVING);
+    this.storageGroup = storageGroup;
+    this.pause = pause;
+  }
+
+  public PauseArchivingPlan(long taskId, boolean pause) {
+    super(Operator.OperatorType.PAUSE_ARCHIVING);
+    this.taskId = taskId;
+    this.pause = pause;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return null;
+  }
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public boolean isPause() {
+    return pause;
+  }
+
+  public PartialPath getStorageGroup() {
+    return storageGroup;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetArchivingPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetArchivingPlan.java
new file mode 100644
index 0000000000..b686a452df
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetArchivingPlan.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class SetArchivingPlan extends PhysicalPlan {
+  private long taskId = -1;
+  private PartialPath storageGroup;
+  private File targetDir;
+  private long ttl;
+  private long startTime;
+
+  public SetArchivingPlan() {
+    super(OperatorType.SET_ARCHIVING);
+  }
+
+  public SetArchivingPlan(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+    super(OperatorType.SET_ARCHIVING);
+    this.storageGroup = storageGroup;
+    this.targetDir = targetDir;
+    this.ttl = ttl;
+    this.startTime = startTime;
+  }
+
+  public SetArchivingPlan(
+      long taskId, PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+    // set archiving w/ taskId
+    super(OperatorType.SET_ARCHIVING);
+    this.taskId = taskId;
+    this.storageGroup = storageGroup;
+    this.targetDir = targetDir;
+    this.ttl = ttl;
+    this.startTime = startTime;
+  }
+
+  public SetArchivingPlan(PartialPath storageGroup) {
+    // cancel archiving using storage group
+    this(storageGroup, null, Long.MAX_VALUE, Long.MAX_VALUE);
+  }
+
+  public SetArchivingPlan(long taskId) {
+    // cancel archiving using taskId
+    this(taskId, null, null, Long.MAX_VALUE, Long.MAX_VALUE);
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    int type = PhysicalPlanType.ARCHIVING.ordinal();
+    stream.writeByte((byte) type);
+    stream.writeLong(ttl);
+    stream.writeLong(startTime);
+    putString(stream, storageGroup.getFullPath());
+    putString(stream, targetDir.getAbsolutePath());
+
+    stream.writeLong(taskId);
+  }
+
+  @Override
+  public void serializeImpl(ByteBuffer buffer) {
+    int type = PhysicalPlanType.TTL.ordinal();
+    buffer.put((byte) type);
+    buffer.putLong(ttl);
+    buffer.putLong(startTime);
+    putString(buffer, storageGroup.getFullPath());
+    putString(buffer, targetDir.getAbsolutePath());
+
+    buffer.putLong(taskId);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+    this.ttl = buffer.getLong();
+    this.startTime = buffer.getLong();
+    this.storageGroup = new PartialPath(readString(buffer));
+    this.targetDir = new File(readString(buffer));
+
+    this.taskId = buffer.getLong();
+  }
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public PartialPath getStorageGroup() {
+    return storageGroup;
+  }
+
+  public void setStorageGroup(PartialPath storageGroup) {
+    this.storageGroup = storageGroup;
+  }
+
+  public File getTargetDir() {
+    return targetDir;
+  }
+
+  public void setTargetDir(File targetDir) {
+    this.targetDir = targetDir;
+  }
+
+  public long getTTL() {
+    return ttl;
+  }
+
+  public void setTTL(long ttl) {
+    this.ttl = ttl;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
index 42eaf481d3..1b7cd70549 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
@@ -16,12 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.storagegroup;
+package org.apache.iotdb.db.qp.physical.sys;
 
-public enum TsFileResourceStatus {
-  UNCLOSED,
-  CLOSED,
-  COMPACTION_CANDIDATE,
-  COMPACTING,
-  DELETED
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.util.List;
+
+public class ShowArchivingPlan extends ShowPlan {
+  private List<PartialPath> storageGroups;
+
+  public ShowArchivingPlan(List<PartialPath> storageGroups) {
+    super(ShowContentType.SHOW_ARCHIVING);
+    this.storageGroups = storageGroups;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return null;
+  }
+
+  public List<PartialPath> getStorageGroups() {
+    return storageGroups;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 021814e1b8..35e5f77132 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -127,5 +127,6 @@ public class ShowPlan extends PhysicalPlan {
     NODES_IN_SCHEMA_TEMPLATE,
     PATHS_SET_SCHEMA_TEMPLATE,
     PATHS_USING_SCHEMA_TEMPLATE,
+    SHOW_ARCHIVING
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 70c737a6ee..47dd2c1a77 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -106,6 +106,8 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.StringContainer;
 
@@ -785,6 +787,126 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
     return operator;
   }
 
+  // Set Archiving
+  @Override
+  public Operator visitSetArchiving(IoTDBSqlParser.SetArchivingContext ctx) {
+    SetArchivingOperator operator = new SetArchivingOperator(SQLConstant.TOK_SET);
+
+    if (ctx.storageGroup != null) {
+      operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+    }
+    if (ctx.ttl != null) {
+      operator.setTTL(Long.parseLong(ctx.ttl.getText()));
+    }
+    if (ctx.startTime != null) {
+      operator.setStartTime(parseDateFormat(ctx.startTime.getText()));
+    }
+    if (ctx.targetDir != null) {
+      FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+      File targetDir = fsFactory.getFile(parseStringLiteral(ctx.targetDir.getText()));
+      if (!targetDir.exists()) {
+        throw new SQLParserException("unknown directory");
+      } else if (!targetDir.isDirectory()) {
+        throw new SQLParserException("not a directory");
+      }
+      operator.setTargetDir(targetDir);
+    }
+
+    // parse the setArchivingClause
+    for (IoTDBSqlParser.SetArchivingClauseContext setArchivingClauseContext :
+        ctx.setArchivingClause()) {
+      parseSetArchivingClause(operator, setArchivingClauseContext);
+    }
+
+    return operator;
+  }
+
+  private void parseSetArchivingClause(
+      SetArchivingOperator operator, IoTDBSqlParser.SetArchivingClauseContext ctx) {
+    if (ctx.storageGroup != null) {
+      operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+    }
+    if (ctx.ttl != null) {
+      operator.setTTL(Long.parseLong(ctx.ttl.getText()));
+    }
+    if (ctx.startTime != null) {
+      operator.setStartTime(parseDateFormat(ctx.startTime.getText()));
+    }
+    if (ctx.targetDir != null) {
+      FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+      File targetDir = fsFactory.getFile(parseStringLiteral(ctx.targetDir.getText()));
+      if (!targetDir.exists()) {
+        throw new SQLParserException("unknown directory");
+      } else if (!targetDir.isDirectory()) {
+        throw new SQLParserException("not a directory");
+      }
+      operator.setTargetDir(targetDir);
+    }
+  }
+
+  // Cancel Archiving
+  @Override
+  public Operator visitCancelArchiving(IoTDBSqlParser.CancelArchivingContext ctx) {
+    CancelArchivingOperator operator = new CancelArchivingOperator(SQLConstant.TOK_UNSET);
+    if (ctx.storageGroup != null) {
+      operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+    } else if (ctx.taskId != null) {
+      operator.setTaskId(Long.parseLong(ctx.taskId.getText()));
+    } else {
+      // unknown case
+      throw new SQLParserException("cancel archiving unknown case");
+    }
+    return operator;
+  }
+
+  // Pause Archiving
+  @Override
+  public Operator visitPauseArchiving(IoTDBSqlParser.PauseArchivingContext ctx) {
+    PauseArchivingOperator operator = new PauseArchivingOperator(SQLConstant.TOK_SET);
+    if (ctx.storageGroup != null) {
+      operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+    } else if (ctx.taskId != null) {
+      operator.setTaskId(Long.parseLong(ctx.taskId.getText()));
+    } else {
+      // unknown case
+      throw new SQLParserException("pause archiving unknown case");
+    }
+    return operator;
+  }
+
+  // Resume Archiving
+  @Override
+  public Operator visitResumeArchiving(IoTDBSqlParser.ResumeArchivingContext ctx) {
+    ResumeArchivingOperator operator = new ResumeArchivingOperator(SQLConstant.TOK_UNSET);
+    if (ctx.storageGroup != null) {
+      operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+    } else if (ctx.taskId != null) {
+      operator.setTaskId(Long.parseLong(ctx.taskId.getText()));
+    } else {
+      // unknown case
+      throw new SQLParserException("resume archiving unknown case");
+    }
+    return operator;
+  }
+
+  // Show Archiving
+  @Override
+  public Operator visitShowArchiving(IoTDBSqlParser.ShowArchivingContext ctx) {
+    List<PartialPath> storageGroups = new ArrayList<>();
+    List<IoTDBSqlParser.PrefixPathContext> prefixPathList = ctx.prefixPath();
+    for (IoTDBSqlParser.PrefixPathContext prefixPath : prefixPathList) {
+      storageGroups.add(parsePrefixPath(prefixPath));
+    }
+    return new ShowArchivingOperator(storageGroups);
+  }
+
+  // Show All Archiving
+  @Override
+  public Operator visitShowAllArchiving(IoTDBSqlParser.ShowAllArchivingContext ctx) {
+    List<PartialPath> storageGroups = new ArrayList<>();
+    return new ShowArchivingOperator(storageGroups);
+  }
+
   // Start Trigger
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 1333674805..db237b904e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceCheck;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingManager;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
@@ -140,6 +141,7 @@ public class IoTDB implements IoTDBMBean {
     logger.info("recover the schema...");
     initMManager();
     initServiceProvider();
+    initArchivingManager();
     registerManager.register(JMXService.getInstance());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
@@ -226,6 +228,11 @@ public class IoTDB implements IoTDBMBean {
         IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold());
   }
 
+  private void initArchivingManager() {
+    // recover ArchivingTasks, finish archiving unfinished tsfiles, start check threads
+    ArchivingManager.getInstance().init();
+  }
+
   @Override
   public void stop() {
     deactivate();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriterReaderTest.java
new file mode 100644
index 0000000000..67b316531c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriterReaderTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ArchivingOperateWriterReaderTest {
+  private static final String filePath = "logtest.test";
+  private final String sg1 = "root.ARCHIVING_SG1";
+  private final String sg2 = "root.ARCHIVING_SG1";
+  private long startTime; // 2023-01-01
+  private final long ttl = 2000;
+  private final String targetDirPath = Paths.get("data", "separated").toString();
+  List<ArchivingOperate> archivingOperate;
+  ArchivingTask task1, task2;
+
+  @Before
+  public void prepare() throws IllegalPathException, LogicalOperatorException {
+    if (new File(filePath).exists()) {
+      new File(filePath).delete();
+    }
+    task1 = new ArchivingTask(120, new PartialPath(sg1), new File(targetDirPath), startTime, ttl);
+    task2 = new ArchivingTask(999, new PartialPath(sg2), new File(targetDirPath), startTime, ttl);
+
+    archivingOperate = new ArrayList<>();
+    archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.START, task1));
+    archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.SET, task1));
+    archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.CANCEL, task2));
+    archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.PAUSE, task2));
+    archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.RESUME, task2));
+
+    startTime = DatetimeUtils.convertDatetimeStrToLong("2023-01-01", ZoneId.systemDefault());
+    task1.close();
+    task2.close();
+  }
+
+  public void writeLog(ArchivingOperateWriter writer) throws IOException {
+    writer.log(ArchivingOperate.ArchivingOperateType.START, task1);
+    writer.log(ArchivingOperate.ArchivingOperateType.SET, task1);
+    writer.log(ArchivingOperate.ArchivingOperateType.CANCEL, task2);
+    writer.log(ArchivingOperate.ArchivingOperateType.PAUSE, task2);
+    writer.log(ArchivingOperate.ArchivingOperateType.RESUME, task2);
+  }
+
+  /** check if two logs have equal fields */
+  public boolean logEquals(ArchivingOperate log1, ArchivingOperate log2) {
+    if (log1.getType() != log2.getType()) {
+      return false;
+    }
+    if (log1.getTask().getTaskId() != log2.getTask().getTaskId()) {
+      return false;
+    }
+
+    if (log1.getType() == ArchivingOperate.ArchivingOperateType.SET) {
+      // check other fields only if SET
+      if (log1.getTask().getStartTime() != log2.getTask().getStartTime()) {
+        return false;
+      }
+      if (log1.getTask().getTTL() != log2.getTask().getTTL()) {
+        return false;
+      }
+      if (!log1.getTask()
+          .getStorageGroup()
+          .getFullPath()
+          .equals(log2.getTask().getStorageGroup().getFullPath())) {
+        return false;
+      }
+      if (!log1.getTask()
+          .getTargetDir()
+          .getPath()
+          .equals(log2.getTask().getTargetDir().getPath())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Test
+  public void testWriteAndRead() throws Exception {
+    ArchivingOperateWriter writer = new ArchivingOperateWriter(filePath);
+    writeLog(writer);
+    try (ArchivingOperateReader reader = new ArchivingOperateReader(new File(filePath))) {
+      writer.close();
+      List<ArchivingOperate> res = new ArrayList<>();
+      while (reader.hasNext()) {
+        res.add(reader.next());
+      }
+      for (int i = 0; i < archivingOperate.size(); i++) {
+        assertTrue(logEquals(archivingOperate.get(i), res.get(i)));
+      }
+    } finally {
+      new File(filePath).delete();
+    }
+  }
+
+  @Test
+  public void testTruncateBrokenLogs() throws Exception {
+    try {
+      // write normal data
+      try (ArchivingOperateWriter writer = new ArchivingOperateWriter(filePath)) {
+        writeLog(writer);
+      }
+      long expectedLength = new File(filePath).length();
+
+      // just write partial content
+      try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+          FileChannel channel = outputStream.getChannel()) {
+        ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+        for (int i = 0; i < 20; ++i) {
+          logBuffer.putInt(Integer.MIN_VALUE);
+        }
+        logBuffer.flip();
+        ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+        lengthBuffer.putInt(logBuffer.capacity());
+        lengthBuffer.flip();
+
+        channel.write(lengthBuffer);
+        channel.write(logBuffer);
+        channel.force(true);
+      }
+
+      // read & check
+      try (ArchivingOperateReader reader = new ArchivingOperateReader(new File(filePath))) {
+        List<ArchivingOperate> res = new ArrayList<>();
+        while (reader.hasNext()) {
+          res.add(reader.next());
+        }
+        for (int i = 0; i < archivingOperate.size(); i++) {
+          assertTrue(logEquals(archivingOperate.get(i), res.get(i)));
+        }
+      }
+      assertEquals(expectedLength, new File(filePath).length());
+    } finally {
+      new File(filePath).delete();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingRecoverTest.java
new file mode 100644
index 0000000000..e1092a35dd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingRecoverTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ArchivingRecoverTest {
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final File ARCHIVING_LOG_DIR =
+      SystemFileFactory.INSTANCE.getFile(
+          Paths.get(
+                  FilePathUtils.regularizePath(config.getSystemDir()),
+                  IoTDBConstant.ARCHIVING_FOLDER_NAME,
+                  IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+              .toString());
+  private long testTaskId = 99;
+  private File testLogFile;
+  private List<File> testFiles;
+  private File testTargetDir;
+
+  @Before
+  public void setUp()
+      throws MetadataException, StorageGroupProcessorException, LogicalOperatorException {
+    EnvironmentUtils.envSetUp();
+
+    testLogFile = SystemFileFactory.INSTANCE.getFile(ARCHIVING_LOG_DIR, testTaskId + ".log");
+
+    testTargetDir = new File("testTargetDir");
+    testTargetDir.mkdirs();
+
+    testFiles = new ArrayList<>();
+    testFiles.add(new File("test.tsfile"));
+    testFiles.add(new File("test.tsfile.resource"));
+    testFiles.add(new File("test.tsfile.mods"));
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+
+    FileUtils.deleteDirectory(testTargetDir);
+    deleteTestFiles();
+  }
+
+  private void setupTestFiles() throws IOException {
+    for (File file : testFiles) {
+      if (!file.exists()) {
+        file.createNewFile();
+      }
+    }
+  }
+
+  private void deleteTestFiles() {
+    for (File file : testFiles) {
+      if (file.exists()) {
+        file.delete();
+      }
+    }
+  }
+
+  private void cleanupTargetDir() {
+    for (File file : testTargetDir.listFiles()) {
+      file.delete();
+    }
+  }
+
+  private File getTsFile() {
+    return testFiles.get(0);
+  }
+
+  @Test
+  public void testWriteAndRead() throws Exception {
+    // create test files
+    setupTestFiles();
+
+    // test write
+    ArchivingTask task = new ArchivingTask(testTaskId, null, testTargetDir, 0, 0);
+    task.startTask();
+    task.startFile(getTsFile());
+    task.close();
+
+    FileInputStream fileInputStream = new FileInputStream(testLogFile);
+
+    assertTrue(testLogFile.exists());
+    assertEquals(testTargetDir.getAbsolutePath(), ReadWriteIOUtils.readString(fileInputStream));
+    assertEquals(getTsFile().getAbsolutePath(), ReadWriteIOUtils.readString(fileInputStream));
+    assertEquals(0, fileInputStream.available());
+
+    fileInputStream.close();
+
+    // test read
+    ArchivingRecover recover = new ArchivingRecover();
+    recover.recover();
+
+    for (File file : testFiles) {
+      assertFalse(file.exists());
+    }
+
+    assertFalse(testLogFile.exists());
+
+    assertEquals(3, testTargetDir.listFiles().length);
+    cleanupTargetDir();
+  }
+
+  @Test
+  public void testMissingFiles() throws IOException {
+    // test missing .tsfile .resource .mods
+    File missingTsFile = new File("testMissing.tsfile");
+    File missingTsFileRes = new File("testMissing.tsfile.resource");
+    File missingTsFileMods = new File("testMissing.tsfile.mods");
+
+    assertFalse(missingTsFile.exists());
+    assertFalse(missingTsFileRes.exists());
+    assertFalse(missingTsFileMods.exists());
+
+    testLogFile.createNewFile();
+
+    FileOutputStream logOutput = new FileOutputStream(testLogFile);
+
+    ReadWriteIOUtils.write(testTargetDir.getAbsolutePath(), logOutput);
+    ReadWriteIOUtils.write(missingTsFile.getAbsolutePath(), logOutput);
+
+    logOutput.close();
+
+    // test read
+    ArchivingRecover recover = new ArchivingRecover();
+    recover.recover();
+
+    assertEquals(0, testTargetDir.listFiles().length);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
new file mode 100644
index 0000000000..7a6a5eb626
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowArchivingPlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ArchivingTest {
+  private final String sg1 = "root.ARCHIVING_SG1";
+  private final String sg2 = "root.ARCHIVING_SG2";
+  private final long ttl = 12345;
+  private long startTime; // 2023-01-01
+  private VirtualStorageGroupProcessor virtualStorageGroupProcessor;
+  private final String s1 = "s1";
+  private final String g1s1 = sg1 + IoTDBConstant.PATH_SEPARATOR + s1;
+  private long prevPartitionInterval;
+  private File targetDir;
+
+  @Before
+  public void setUp()
+      throws MetadataException, StorageGroupProcessorException, LogicalOperatorException {
+    prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+    IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(86400);
+    EnvironmentUtils.envSetUp();
+    createSchemas();
+    targetDir = new File("separated_test");
+    targetDir.mkdirs();
+
+    startTime = DatetimeUtils.convertDatetimeStrToLong("2023-01-01", ZoneId.systemDefault());
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+    EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
+    File[] movedFiles = targetDir.listFiles();
+    if (movedFiles != null) {
+      for (File file : movedFiles) {
+        file.delete();
+      }
+    }
+    targetDir.delete();
+  }
+
+  private void createSchemas() throws MetadataException, StorageGroupProcessorException {
+    virtualStorageGroupProcessor =
+        new VirtualStorageGroupProcessor(
+            IoTDBDescriptor.getInstance().getConfig().getSystemDir(),
+            sg1,
+            new DirectFlushPolicy(),
+            sg1);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath(g1s1),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        Collections.emptyMap());
+  }
+
+  private void prepareData()
+      throws WriteProcessException, QueryProcessException, IllegalPathException,
+          TriggerExecutionException {
+    InsertRowPlan plan = new InsertRowPlan();
+    plan.setDevicePath(new PartialPath(sg1));
+    plan.setTime(System.currentTimeMillis());
+    plan.setMeasurements(new String[] {"s1"});
+    plan.setDataTypes(new TSDataType[] {TSDataType.INT64});
+    plan.setValues(new Object[] {1L});
+    plan.setMeasurementMNodes(
+        new IMeasurementMNode[] {
+          MeasurementMNode.getMeasurementMNode(
+              null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
+        });
+    plan.transferType();
+
+    long initTime = System.currentTimeMillis();
+    // sequence data
+    for (int i = 1000; i < 2000; i++) {
+      plan.setTime(initTime - 2000 + i);
+      virtualStorageGroupProcessor.insert(plan);
+      if ((i + 1) % 300 == 0) {
+        virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+      }
+    }
+    // unsequence data
+    for (int i = 0; i < 1000; i++) {
+      plan.setTime(initTime - 2000 + i);
+      virtualStorageGroupProcessor.insert(plan);
+      if ((i + 1) % 300 == 0) {
+        virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+      }
+    }
+  }
+
+  @Test
+  public void testArchiving()
+      throws StorageEngineException, WriteProcessException, QueryProcessException,
+          IllegalPathException, IOException {
+    prepareData();
+
+    virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+
+    // files before ttl
+    File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
+    File unseqDir = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), sg1);
+
+    List<File> seqFiles = new ArrayList<>();
+    for (File directory : seqDir.listFiles()) {
+      if (directory.isDirectory()) {
+        for (File file : directory.listFiles()) {
+          if (file.isDirectory()) {
+            for (File tsfile : file.listFiles()) {
+              if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+                seqFiles.add(file);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    List<File> unseqFiles = new ArrayList<>();
+    for (File directory : unseqDir.listFiles()) {
+      if (directory.isDirectory()) {
+        for (File file : directory.listFiles()) {
+          if (file.isDirectory()) {
+            for (File tsfile : file.listFiles()) {
+              if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+                unseqFiles.add(file);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    assertEquals(4, seqFiles.size());
+    assertEquals(4, unseqFiles.size());
+
+    try {
+      Thread.sleep(500);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    // create a new ArchivingTask with specified params
+    ArchivingTask task = new ArchivingTask(0, new PartialPath(sg1), targetDir, 500, 0);
+    task.setStatus(ArchivingTask.ArchivingTaskStatus.RUNNING);
+    task.startTask();
+    virtualStorageGroupProcessor.checkArchivingTask(task);
+    task.close();
+
+    // files after archiving
+    seqFiles = new ArrayList<>();
+    for (File directory : seqDir.listFiles()) {
+      if (directory.isDirectory()) {
+        for (File file : directory.listFiles()) {
+          if (file.isDirectory()) {
+            for (File tsfile : file.listFiles()) {
+              if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+                seqFiles.add(file);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    unseqFiles = new ArrayList<>();
+    for (File directory : unseqDir.listFiles()) {
+      if (directory.isDirectory()) {
+        for (File file : directory.listFiles()) {
+          if (file.isDirectory()) {
+            for (File tsfile : file.listFiles()) {
+              if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+                unseqFiles.add(file);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    assertTrue(seqFiles.size() <= 2);
+    assertEquals(0, unseqFiles.size());
+
+    List<File> targetFiles = new ArrayList<>();
+    for (File tsfile : targetDir.listFiles()) {
+      if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+        targetFiles.add(tsfile);
+      }
+    }
+
+    assertEquals(8, targetFiles.size() + seqFiles.size() + unseqFiles.size());
+  }
+
+  @Test
+  public void testParseSetArchiving() throws QueryProcessException {
+    Planner planner = new Planner();
+    SetArchivingPlan plan =
+        (SetArchivingPlan)
+            planner.parseSQLToPhysicalPlan(
+                String.format(
+                    "SET ARCHIVING TO %s 2023-01-01 10000 '%s'", sg1, targetDir.getPath()));
+    assertEquals(sg1, plan.getStorageGroup().getFullPath());
+    assertEquals(10000, plan.getTTL());
+    assertEquals(startTime, plan.getStartTime());
+    assertEquals(targetDir.getPath(), plan.getTargetDir().getPath());
+
+    plan =
+        (SetArchivingPlan)
+            planner.parseSQLToPhysicalPlan(
+                String.format(
+                    "SET ARCHIVING TO start_time=2023-01-01 storage_group=%s ttl=10000 target_dir='%s'",
+                    sg1, targetDir.getPath()));
+    assertEquals(sg1, plan.getStorageGroup().getFullPath());
+    assertEquals(10000, plan.getTTL());
+    assertEquals(startTime, plan.getStartTime());
+    assertEquals(targetDir.getPath(), plan.getTargetDir().getPath());
+
+    plan = (SetArchivingPlan) planner.parseSQLToPhysicalPlan("CANCEL ARCHIVING ON " + sg2);
+    assertEquals(sg2, plan.getStorageGroup().getFullPath());
+    assertEquals(Long.MAX_VALUE, plan.getTTL());
+    assertEquals(Long.MAX_VALUE, plan.getStartTime());
+
+    plan = (SetArchivingPlan) planner.parseSQLToPhysicalPlan("CANCEL ARCHIVING 99");
+    assertEquals(99, plan.getTaskId());
+    assertEquals(Long.MAX_VALUE, plan.getTTL());
+    assertEquals(Long.MAX_VALUE, plan.getStartTime());
+  }
+
+  @Test
+  public void testParsePauseArchiving() throws QueryProcessException {
+    Planner planner = new Planner();
+    PauseArchivingPlan plan =
+        (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("PAUSE ARCHIVING ON " + sg2);
+    assertEquals(sg2, plan.getStorageGroup().getFullPath());
+    assertTrue(plan.isPause());
+
+    plan = (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("PAUSE ARCHIVING 10");
+    assertEquals(10, plan.getTaskId());
+    assertTrue(plan.isPause());
+
+    plan = (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("RESUME ARCHIVING ON " + sg1);
+    assertEquals(sg1, plan.getStorageGroup().getFullPath());
+    assertFalse(plan.isPause());
+
+    plan = (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("RESUME ARCHIVING 16");
+    assertEquals(16, plan.getTaskId());
+    assertFalse(plan.isPause());
+  }
+
+  @Test
+  public void testParseShowArchiving() throws QueryProcessException {
+    Planner planner = new Planner();
+    ShowArchivingPlan plan =
+        (ShowArchivingPlan) planner.parseSQLToPhysicalPlan("SHOW ALL ARCHIVING");
+    assertTrue(plan.getStorageGroups().isEmpty());
+
+    plan = (ShowArchivingPlan) planner.parseSQLToPhysicalPlan("SHOW ARCHIVING ON " + sg1);
+    assertEquals(sg1, plan.getStorageGroups().get(0).getFullPath());
+  }
+
+  @Test
+  public void testShowArchiving()
+      throws IOException, QueryProcessException, QueryFilterOptimizationException,
+          StorageEngineException, MetadataException, InterruptedException {
+    ArchivingManager archivingManager = ArchivingManager.getInstance();
+    archivingManager.setArchiving(new PartialPath(sg1), targetDir, ttl, startTime);
+
+    ShowArchivingPlan plan = new ShowArchivingPlan(Collections.emptyList());
+    PlanExecutor executor = new PlanExecutor();
+    QueryDataSet queryDataSet = executor.processQuery(plan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+
+    while (queryDataSet.hasNext()) {
+      RowRecord rowRecord = queryDataSet.next();
+      String sg = rowRecord.getFields().get(2).getStringValue();
+      if (sg.equals(sg1)) {
+        ZonedDateTime startDate = DatetimeUtils.convertMillsecondToZonedDateTime(startTime);
+        assertEquals(
+            DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(startDate),
+            rowRecord.getFields().get(4).getStringValue());
+        assertEquals(ttl, rowRecord.getFields().get(5).getLongV());
+        assertEquals(targetDir.getPath(), rowRecord.getFields().get(6).getStringValue());
+      } else {
+        fail();
+      }
+    }
+  }
+
+  @Test
+  public void testArchivingCleanFile()
+      throws WriteProcessException, QueryProcessException, IllegalPathException,
+          TriggerExecutionException {
+    prepareData();
+    virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+
+    assertEquals(4, virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+    assertEquals(4, virtualStorageGroupProcessor.getUnSequenceFileList().size());
+
+    ArchivingTask task = new ArchivingTask(0, new PartialPath(sg1), targetDir, 0, 0);
+    task.setStatus(ArchivingTask.ArchivingTaskStatus.RUNNING);
+    virtualStorageGroupProcessor.checkArchivingTask(task);
+
+    assertEquals(0, virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+    assertEquals(0, virtualStorageGroupProcessor.getUnSequenceFileList().size());
+    task.close();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index a7bd78165b..b439387eef 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.SystemStatus;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingManager;
 import org.apache.iotdb.db.engine.cache.BloomFilterCache;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
@@ -143,6 +144,9 @@ public class EnvironmentUtils {
       fail();
     }
 
+    // clear archiving manager
+    ArchivingManager.getInstance().close();
+
     IoTDBDescriptor.getInstance().getConfig().setSystemStatus(SystemStatus.NORMAL);
     // We must disable MQTT service as it will cost a lot of time to be shutdown, which may slow our
     // unit tests.