You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/06 08:19:31 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3882] Support Data Archiving (#6989)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 04d379f277 [To rel/0.13][IOTDB-3882] Support Data Archiving (#6989)
04d379f277 is described below
commit 04d379f277737b69a3869a6f91722a49fd347712
Author: Mani <46...@users.noreply.github.com>
AuthorDate: Thu Oct 6 16:19:26 2022 +0800
[To rel/0.13][IOTDB-3882] Support Data Archiving (#6989)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 40 ++
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 28 ++
docs/UserGuide/Process-Data/Archiving.md | 155 ++++++++
docs/zh/UserGuide/Process-Data/Archiving.md | 148 +++++++
.../iotdb/db/integration/IoTDBArchivingIT.java | 330 ++++++++++++++++
.../resources/conf/iotdb-engine.properties | 4 +
.../org/apache/iotdb/db/concurrent/ThreadName.java | 2 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 9 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +
.../org/apache/iotdb/db/engine/StorageEngine.java | 30 ++
.../db/engine/archiving/ArchivingManager.java | 426 +++++++++++++++++++++
.../db/engine/archiving/ArchivingOperate.java | 107 ++++++
.../engine/archiving/ArchivingOperateReader.java | 106 +++++
.../engine/archiving/ArchivingOperateWriter.java | 83 ++++
.../db/engine/archiving/ArchivingRecover.java | 239 ++++++++++++
.../iotdb/db/engine/archiving/ArchivingTask.java | 201 ++++++++++
.../db/engine/storagegroup/TsFileResource.java | 29 ++
.../engine/storagegroup/TsFileResourceStatus.java | 3 +-
.../storagegroup/VirtualStorageGroupProcessor.java | 75 ++++
.../virtualSg/StorageGroupManager.java | 16 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 136 +++++++
.../org/apache/iotdb/db/qp/logical/Operator.java | 6 +-
.../db/qp/logical/sys/CancelArchivingOperator.java | 67 ++++
.../db/qp/logical/sys/PauseArchivingOperator.java | 64 ++++
.../db/qp/logical/sys/ResumeArchivingOperator.java | 65 ++++
.../db/qp/logical/sys/SetArchivingOperator.java | 94 +++++
.../db/qp/logical/sys/ShowArchivingOperator.java | 46 +++
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 3 +-
.../db/qp/physical/sys/PauseArchivingPlan.java | 65 ++++
.../iotdb/db/qp/physical/sys/SetArchivingPlan.java | 148 +++++++
.../physical/sys/ShowArchivingPlan.java} | 28 +-
.../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 1 +
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 122 ++++++
.../java/org/apache/iotdb/db/service/IoTDB.java | 7 +
.../ArchivingOperateWriterReaderTest.java | 174 +++++++++
.../db/engine/archiving/ArchivingRecoverTest.java | 175 +++++++++
.../iotdb/db/engine/archiving/ArchivingTest.java | 375 ++++++++++++++++++
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +
39 files changed, 3621 insertions(+), 10 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 893504909e..5720a95b0f 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -46,6 +46,8 @@ ddlStatement
| showSchemaTemplates | showNodesInSchemaTemplate
| showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
| countStorageGroup | countDevices | countTimeseries | countNodes
+ | setArchiving | cancelArchiving | pauseArchiving | resumeArchiving | showArchiving
+ | showAllArchiving
;
dmlStatement
@@ -330,6 +332,44 @@ countNodes
: COUNT NODES prefixPath LEVEL OPERATOR_EQ INTEGER_LITERAL
;
+// Set Archiving
+setArchiving
+ : SET ARCHIVING TO storageGroup=prefixPath startTime=DATETIME_LITERAL ttl=INTEGER_LITERAL targetDir=STRING_LITERAL
+ | SET ARCHIVING TO setArchivingClause*
+ ;
+
+setArchivingClause
+ : STORAGE_GROUP OPERATOR_EQ storageGroup=prefixPath
+ | START_TIME OPERATOR_EQ startTime=DATETIME_LITERAL
+ | TTL OPERATOR_EQ ttl=INTEGER_LITERAL
+ | TARGET_DIR OPERATOR_EQ targetDir=STRING_LITERAL
+ ;
+
+// Cancel Archiving
+cancelArchiving
+ : CANCEL ARCHIVING (ON storageGroup=prefixPath | taskId=INTEGER_LITERAL)
+ ;
+
+// Pause Archiving
+pauseArchiving
+ : PAUSE ARCHIVING (ON storageGroup=prefixPath | taskId=INTEGER_LITERAL)
+ ;
+
+// Unpause/Resume Archiving
+resumeArchiving
+ : RESUME ARCHIVING (ON storageGroup=prefixPath | taskId=INTEGER_LITERAL)
+ ;
+
+// Show Archiving
+showArchiving
+ : SHOW ARCHIVING ON prefixPath (COMMA prefixPath)*
+ ;
+
+// Show All Archiving
+showAllArchiving
+ : SHOW ALL ARCHIVING
+ ;
+
/**
* 3. Data Manipulation Language (DML)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index dff4887a61..1dd1c4ff48 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -73,6 +73,10 @@ APPEND
: A P P E N D
;
+ARCHIVING
+ : A R C H I V I N G
+ ;
+
AS
: A S
;
@@ -109,6 +113,10 @@ CACHE
: C A C H E
;
+CANCEL
+ : C A N C E L
+ ;
+
CHILD
: C H I L D
;
@@ -350,6 +358,10 @@ ORDER
: O R D E R
;
+PAUSE
+ : P A U S E
+ ;
+
PARTITION
: P A R T I T I O N
;
@@ -418,6 +430,10 @@ RESOURCE
: R E S O U R C E
;
+RESUME
+ : R E S U M E
+ ;
+
REVOKE
: R E V O K E
;
@@ -466,10 +482,18 @@ SOFFSET
: S O F F S E T
;
+START_TIME
+ : S T A R T '_' T I M E
+ ;
+
STORAGE
: S T O R A G E
;
+STORAGE_GROUP
+ : S T O R A G E '_' G R O U P
+ ;
+
START
: S T A R T
;
@@ -486,6 +510,10 @@ TAGS
: T A G S
;
+TARGET_DIR
+ : T A R G E T '_' D I R
+ ;
+
TASK
: T A S K
;
diff --git a/docs/UserGuide/Process-Data/Archiving.md b/docs/UserGuide/Process-Data/Archiving.md
new file mode 100644
index 0000000000..bb2409b41e
--- /dev/null
+++ b/docs/UserGuide/Process-Data/Archiving.md
@@ -0,0 +1,155 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# Archiving Data
+
+The archiving data tools consist of 5 Cli commands: `set`, `cancel`, `pause`, `continue`, and `show`. Users may use
+archiving tools to create archiving tasks, these archiving tasks start at the user specified date, and archives expired
+data (timestamp before expire time) into a target directory specified by user, the user can then perform other
+operations such as `pause` on the tasks.
+
+## SQL statements
+
+### Show Archiving Tasks
+
+Show the data archiving tasks.
+
+#### Syntax
+
+```sql
+SHOW ALL ARCHIVING
+SHOW ARCHIVING ON <storage_group>
+```
+
+- `<storage_group>` specifies the storage group to show archiving task on.
+
+#### Example Result
+
+```sql
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+|task id| submit time|storage group|status| start time|expire time(ms)|target directory|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+| 0|2022-1-1T00:00:00.000+08:00| root.ln| READY|2023-1-1T00:00:00.000+08:00| 360000| /tmp|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+```
+
+### Set Data Archiving Task
+
+User submit data archiving task.
+
+#### Syntax
+
+```sql
+SET ARCHIVING TO <storage_group> <start_time> <ttl> <target_dir>
+SET ARCHIVING TO storage_group=<storage_group> start_time=<start_time> ttl=<ttl> target_dir=<target_dir>
+```
+
+- `<storage_group>` specifies the storage group to show archiving task on.
+- `<start_time>` specifies the date to start the archiving task.
+- `<ttl>` specifies the expire time for task, data with `timestamp < now - ttl` are archived, units in milliseconds.
+- `<target_dir>` specifies the target directory to move the archived data, uses string for the path.
+
+#### Example
+
+```sql
+SET ARCHIVING TO storage_group=root.ln start_time=2023-01-01 ttl=360000 target_dir="/tmp"
+SET ARCHIVING TO root.ln 2023-01-01 360000 "/tmp"
+```
+
+#### Tips
+
+- `A=` (such as `storage_group=`) in the Cli commands can be omitted, the order after omission must be the same as the
+ above.
+- The start time is in ISO 8601 format, so information such as hour/minute/second can be omitted, and it is set to 0 by
+ default after being omitted.
+- `SET` command is able to submit migration tasks for all storage groups by parameters like `root.ln.**`.
+
+### Cancel Archiving Task
+
+Stop and delete the data archiving task. (Note: data that has been archived will not be put back into the database)
+
+#### Syntax
+
+```sql
+CANCEL ARCHIVING <task_id>
+CANCEL ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` specifies the id of archiving task to cancel.
+- `<storage_group>` specifies the storage group to cancel archiving task, if many exist cancel the one with the lowest
+ start time.
+
+#### Example
+
+```sql
+CANCEL ARCHIVING 0
+CANCEL ARCHIVING ON root.ln
+```
+
+### Pause Archiving Task
+
+Suspend the data migration task, run the `RESUME` command to resume the task.
+
+#### Syntax
+
+```sql
+PAUSE ARCHIVING <task_id>
+PAUSE ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` specifies the id of archiving task to pause.
+- `<storage_group>` specifies the storage group to pause archiving task, if many exist cancel the one with the lowest
+ start time.
+
+#### Example
+
+```sql
+PAUSE ARCHIVING 0
+PAUSE ARCHIVING ON root.ln
+```
+
+### Resume Archiving Task
+
+Resume suspended data archiving tasks.
+
+#### Syntax
+
+```sql
+RESUME ARCHIVING <task_id>
+RESUME ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` specifies the id of archiving task to resume.
+- `<storage_group>` specifies the storage group to resume archiving task, if many exist cancel the one with the lowest
+ start time.
+
+#### Example
+
+```sql
+RESUME ARCHIVING 0
+RESUME ARCHIVING ON root.ln
+```
+
+## System Parameter Configuration
+
+| Name | Description | Data Type | Default Value |
+|:-----------------------|-------------------------------------------------------------------------| --------- | ------------- |
+| `archiving_thread_num` | The number of threads in the thread pool that executes archiving tasks. | int | 2 |
diff --git a/docs/zh/UserGuide/Process-Data/Archiving.md b/docs/zh/UserGuide/Process-Data/Archiving.md
new file mode 100644
index 0000000000..e5eaecce1a
--- /dev/null
+++ b/docs/zh/UserGuide/Process-Data/Archiving.md
@@ -0,0 +1,148 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# 数据归档
+
+数据归档功能提供 5 个 Cli 命令:包括查看、提交、删除、暂停和继续归档任务。
+用户可以创建归档任务,这些归档任务由用户指定的的启动时间,并归档过期数据到用户指定的目录。
+
+## SQL 语句
+
+### 查看数据归档任务
+
+显示数据归档任务。
+
+#### 语法
+
+```sql
+SHOW ALL ARCHIVING
+SHOW ARCHIVING ON <storage_group>
+```
+
+- `<storage_group>` 返回指定存储组上的任务参数以及状态。
+
+#### 结果示例
+
+```sql
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+|task id| submit time|storage group|status| start time|expire time(ms)|target directory|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+| 0|2022-1-1T00:00:00.000+08:00| root.ln| READY|2023-1-1T00:00:00.000+08:00| 360000| /tmp|
++-------+---------------------------+-------------+------+---------------------------+---------------+----------------+
+```
+
+### 提交数据归档任务
+
+用户提交数据归档任务。
+
+#### 语法
+
+```sql
+SET ARCHIVING TO <storage_group> <start_time> <ttl> <target_dir>
+SET ARCHIVING TO storage_group=<storage_group> start_time=<start_time> ttl=<ttl> target_dir=<target_dir>
+```
+
+- `<storage_group>` 指定的归档的存储组。
+- `<start_time>` 归档任务开始执行的时间。
+- `<ttl>` 数据过期时长,当数据的时间辍 `timestamp < now - ttl` 则为过期数据,单位为毫秒。
+- `<target_dir>` 数据文件被归档存储的目标路径,使用字符串指定路径。
+
+#### 示例
+
+```sql
+SET ARCHIVING TO storage_group=root.ln start_time=2023-01-01 ttl=360000 target_dir="/tmp"
+SET ARCHIVING TO root.ln 2023-01-01 360000 "/tmp"
+```
+
+#### 提示
+
+- 指令中的 `A=` (比如 `storage_group=`)可以省略,省略后顺序必须和上述一致。
+- 开始时间使用 ISO 8601 格式,因此可以省略时/分/秒等信息,省略后默认设成 0。
+- 可以提交全部存储组的归档任务,使用类似 `root.ln.**`。
+
+### 删除数据归档任务
+
+停止并删除数据归档任务。(注意:已经被归档的数据不会被放回数据库中)
+
+#### 语法
+
+```sql
+CANCEL ARCHIVING <task_id>
+CANCEL ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` 归档任务的索引号。
+- `<storage_group>` 取消归档任务的存储组,如果存在多个则取启动时间最早的任务。
+
+#### 示例
+
+```sql
+CANCEL ARCHIVING 0
+CANCEL ARCHIVING ON root.ln
+```
+
+### 暂停数据归档任务
+
+将数据归档任务挂起,可通过 `RESUME` 命令让任务重新执行。
+
+#### 语法
+
+```sql
+PAUSE ARCHIVING <task_id>
+PAUSE ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` 归档任务的索引号。
+- `<storage_group>` 暂停归档任务的存储组,如果存在多个则取启动时间最早的任务。
+
+#### 示例
+
+```sql
+PAUSE ARCHIVING 0
+PAUSE ARCHIVING ON root.ln
+```
+
+### 继续数据归档任务
+
+让挂起的数据归档任务重新执行。
+
+#### 语法
+
+```sql
+RESUME ARCHIVING <task_id>
+RESUME ARCHIVING ON <storage_group>
+```
+
+- `<task_id>` 归档任务的索引号。
+- `<storage_group>` 继续归档任务的存储组,如果存在多个则取启动时间最早的任务。
+
+#### 示例
+
+```sql
+RESUME ARCHIVING 0
+RESUME ARCHIVING ON root.ln
+```
+
+## 系统参数配置
+
+| 参数名 | 描述 | 数据类型 | 默认值 |
+|:-----------------------| ------------------------ | -------- | ------ |
+| `archiving_thread_num` | 数据归档任务使用的线程数 | int | 2 |
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
new file mode 100644
index 0000000000..a46e3b99ed
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.ClusterTest;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBArchivingIT {
+ File testTargetDir;
+ final long ARCHIVING_CHECK_TIME = 60L;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeTest();
+
+ testTargetDir = new File("testTargetDir");
+ testTargetDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterTest();
+
+ FileUtils.deleteDirectory(testTargetDir);
+ }
+
+ @Test
+ @Category({ClusterTest.class})
+ public void testArchiving() throws SQLException, InterruptedException {
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try {
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 0 '" + testTargetDir.getPath() + "'");
+ } catch (SQLException e) {
+ assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+ }
+ try {
+ statement.execute("CANCEL ARCHIVING ON root.ARCHIVING_SG1");
+ } catch (SQLException e) {
+ assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+ }
+ try {
+ statement.execute("PAUSE ARCHIVING ON root.ARCHIVING_SG1");
+ } catch (SQLException e) {
+ assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+ }
+ try {
+ statement.execute("RESUME ARCHIVING ON root.ARCHIVING_SG1");
+ } catch (SQLException e) {
+ assertEquals(TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode(), e.getErrorCode());
+ }
+
+ statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG1");
+ statement.execute(
+ "CREATE TIMESERIES root.ARCHIVING_SG1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN");
+
+ try {
+ statement.execute("SET ARCHIVING TO storage_group=root.ARCHIVING_SG1");
+ } catch (SQLException e) {
+ assertEquals(TSStatusCode.METADATA_ERROR.getStatusCode(), e.getErrorCode());
+ }
+
+ // test set when ttl is in range
+
+ long now = System.currentTimeMillis();
+ for (int i = 0; i < 100; i++) {
+ statement.execute(
+ String.format(
+ "INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d, %d)",
+ now - 100000 + i, i));
+ }
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+
+ // test set when ttl isn't in range
+ StorageEngine.getInstance().syncCloseAllProcessor();
+
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 1000 '" + testTargetDir.getPath() + "'");
+
+ Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(0, cnt);
+ }
+
+ // test pause archive
+
+ for (int i = 0; i < 100; i++) {
+ statement.execute(
+ String.format(
+ "INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d, %d)",
+ now - 5000 + i, i));
+ }
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+
+ StorageEngine.getInstance().syncCloseAllProcessor();
+
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 100000 '"
+ + testTargetDir.getPath()
+ + "'");
+
+ Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+
+ StorageEngine.getInstance().syncCloseAllProcessor();
+
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 0 '" + testTargetDir.getPath() + "'");
+ statement.execute("PAUSE ARCHIVING ON root.ARCHIVING_SG1");
+
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+
+ Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+
+ StorageEngine.getInstance().syncCloseAllProcessor();
+
+ // test resume archive
+ statement.execute("RESUME ARCHIVING ON root.ARCHIVING_SG1");
+
+ Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(0, cnt);
+ }
+
+ // test cancel archive
+
+ for (int i = 0; i < 100; i++) {
+ statement.execute(
+ String.format(
+ "INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d, %d)",
+ now - 5000 + i, i));
+ }
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+
+ StorageEngine.getInstance().syncCloseAllProcessor();
+
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG1 1999-01-01 0 '" + testTargetDir.getPath() + "'");
+ statement.execute("CANCEL ARCHIVING ON root.ARCHIVING_SG1");
+
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+
+ Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+ }
+ }
+
+ @Test
+ @Category({ClusterTest.class})
+ public void testShowArchiving() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+ statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG2");
+ statement.execute(
+ "CREATE TIMESERIES root.ARCHIVING_SG2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN");
+
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG2 2000-12-13 100 '" + testTargetDir.getPath() + "'");
+
+ ResultSet resultSet = statement.executeQuery("SHOW ALL ARCHIVING");
+
+ boolean flag = false;
+
+ while (resultSet.next()) {
+ if (resultSet.getString(3).equals("root.ARCHIVING_SG2")) {
+ flag = true;
+ assertEquals("READY", resultSet.getString(4));
+ assertTrue(resultSet.getString(5).startsWith("2000-12-13"));
+ assertEquals(100, resultSet.getLong(6));
+ assertEquals(testTargetDir.getPath(), resultSet.getString(7));
+ }
+ }
+
+ assertTrue(flag);
+ }
+ }
+
+ @Test
+ @Category({ClusterTest.class})
+ public void testSetArchive() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+ statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG3");
+ statement.execute(
+ "CREATE TIMESERIES root.ARCHIVING_SG3.s3 WITH DATATYPE=INT32, ENCODING=PLAIN");
+
+ for (int i = 0; i < 1000; i++) {
+ // test concurrent set
+ statement.execute(
+ String.format(
+ "SET ARCHIVING TO root.ARCHIVING_SG3 2000-12-13 %d '%s'",
+ i, testTargetDir.getPath()));
+ }
+
+ ResultSet resultSet = statement.executeQuery("SHOW ALL ARCHIVING");
+ Set<Integer> checkTaskId = new HashSet<>();
+ Set<Integer> checkTTL = new HashSet<>();
+
+ while (resultSet.next()) {
+ if (resultSet.getString(3).equals("root.ARCHIVING_SG3")) {
+ int taskId = Integer.parseInt(resultSet.getString(1));
+ int ttl = Integer.parseInt(resultSet.getString(6));
+ assertFalse(checkTaskId.contains(taskId));
+ checkTaskId.add(taskId);
+ assertFalse(checkTTL.contains(ttl));
+ checkTTL.add(ttl);
+ }
+ }
+
+ assertEquals(1000, checkTaskId.size());
+ assertEquals(1000, checkTTL.size());
+
+ for (int i : checkTaskId) {
+ // test concurrent cancel
+ statement.execute(String.format("CANCEL ARCHIVING %d", i));
+ }
+
+ resultSet = statement.executeQuery("SHOW ALL ARCHIVING");
+
+ while (resultSet.next()) {
+ if (resultSet.getString(3).equals("root.ARCHIVING_SG3")) {
+ assertEquals("CANCELED", resultSet.getString(4));
+ }
+ }
+ }
+ }
+}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e17a900a25..ba8ef4b390 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -275,6 +275,10 @@ timestamp_precision=ms
# Datatype: int
# concurrent_query_thread=16
+# How many threads can concurrently run archiving tasks. When <= 0, use default number of 2.
+# Datatype: int
+# archiving_thread_num=2
+
# How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
# Datatype: int
# concurrent_sub_rawQuery_thread=8
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index e94b5af126..1fb8168c6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -71,6 +71,8 @@ public enum ThreadName {
CLUSTER_DATA_HEARTBEAT_RPC_SERVICE("ClusterDataHeartbeatRPC"),
CLUSTER_DATA_HEARTBEAT_RPC_CLIENT("ClusterDataHeartbeatRPC-Client"),
Cluster_Monitor("ClusterMonitor"),
+ ARCHIVING_CHECK("Archiving-Check"),
+ ARCHIVING_TASK("Archiving-Task"),
;
private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f4cfc65920..ae52a526a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -864,6 +864,9 @@ public class IoTDBConfig {
// The max record num returned in one schema query.
private int schemaQueryFetchSize = 10000000;
+ /** number of threads given to archiving tasks */
+ private int archivingThreadNum = 2;
+
// customizedProperties, this should be empty by default.
private Properties customizedProperties = new Properties();
@@ -2750,6 +2753,14 @@ public class IoTDBConfig {
this.schemaQueryFetchSize = schemaQueryFetchSize;
}
+ public int getArchivingThreadNum() {
+ return archivingThreadNum;
+ }
+
+ public void setArchivingThreadNum(int archivingThreadNum) {
+ this.archivingThreadNum = archivingThreadNum;
+ }
+
public double getWriteProportion() {
return writeProportion;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 1294002707..f2e9aed088 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -92,6 +92,13 @@ public class IoTDBConstant {
public static final String COLUMN_LOCK_INFO = "lock holder";
public static final String COLUMN_TTL = "ttl";
+ public static final String COLUMN_TASK_ID = "task id";
+ public static final String COLUMN_SUBMIT_TIME = "submit time";
+ public static final String COLUMN_ARCHIVING_START_TIME = "start time";
+ public static final String COLUMN_ARCHIVING_STATUS = "status";
+ public static final String COLUMN_ARCHIVING_TARGET_DIRECTORY = "target directory";
+ public static final String COLUMN_EXPIRE_TIME = "expire time(ms)";
+
public static final String COLUMN_TASK_NAME = "task name";
public static final String COLUMN_CREATED_TIME = "created time";
public static final String COLUMN_PROGRESS = "progress";
@@ -154,6 +161,8 @@ public class IoTDBConstant {
public static final String EXT_FOLDER_NAME = "ext";
public static final String UDF_FOLDER_NAME = "udf";
public static final String TRIGGER_FOLDER_NAME = "trigger";
+ public static final String ARCHIVING_FOLDER_NAME = "archiving";
+ public static final String ARCHIVING_LOG_FOLDER_NAME = "archiving_task";
// Operation Sync folder name
public static final String OPERATION_SYNC_FOLDER_NAME = "operationsync";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 815113f6b4..a9c3fd99d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -481,6 +481,15 @@ public class IoTDBDescriptor {
conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
}
+ conf.setArchivingThreadNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "archiving_thread_num", Integer.toString(conf.getArchivingThreadNum()))));
+
+ if (conf.getArchivingThreadNum() <= 0) {
+ conf.setArchivingThreadNum(2);
+ }
+
conf.setRawQueryBlockingQueueCapacity(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6ef91ccb28..4c2226a4ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.db.doublelive.OperationSyncLogService;
import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
import org.apache.iotdb.db.doublelive.OperationSyncProducer;
import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
+import org.apache.iotdb.db.engine.archiving.ArchivingManager;
+import org.apache.iotdb.db.engine.archiving.ArchivingOperate;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -169,6 +171,8 @@ public class StorageEngine implements IService {
ArrayList<BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
arrayListBlockQueue;
+ private ArchivingManager archivingManager = ArchivingManager.getInstance();
+
private StorageEngine() {
if (isEnableOperationSync) {
// Open OperationSync
@@ -587,6 +591,7 @@ public class StorageEngine implements IService {
shutdownTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
recoveryThreadPool.shutdownNow();
processorMap.clear();
+ archivingManager.close();
}
private void shutdownTimedService(ScheduledExecutorService pool, String poolName) {
@@ -1322,6 +1327,31 @@ public class StorageEngine implements IService {
}
}
+ /** push the archiving info to archivingManager */
+ public void setArchiving(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+ boolean result = archivingManager.setArchiving(storageGroup, targetDir, ttl, startTime);
+ if (result) {
+ logger.info("set archiving task successfully.");
+ } else {
+ logger.info("set archiving task failed.");
+ }
+ }
+
+ public void operateArchiving(
+ ArchivingOperate.ArchivingOperateType operateType, long taskId, PartialPath storageGroup) {
+ if (taskId >= 0) {
+ archivingManager.operate(operateType, taskId);
+ } else if (storageGroup != null) {
+ archivingManager.operate(operateType, storageGroup);
+ } else {
+ logger.error("{} archiving cannot recognize taskId or storagegroup", operateType.name());
+ }
+ }
+
+ public ArchivingManager getArchivingManager() {
+ return archivingManager;
+ }
+
static class InstanceHolder {
private static final StorageEngine INSTANCE = new StorageEngine();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
new file mode 100644
index 0000000000..9ac1dc3fc9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ArchivingManager keep tracks of all Archiving Tasks, creates the threads to check/run the
+ * ArchivingTasks.
+ */
+public class ArchivingManager {
+ private static final Logger logger = LoggerFactory.getLogger(ArchivingManager.class);
+ protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private final ReentrantLock lock = new ReentrantLock();
+ // region lock resources
+ private ArchivingOperateWriter logWriter;
+ // define ordering for archivingTasks, dictionary order on (status, startTime, storageGroup, -ttl)
+ private final Set<ArchivingTask> archivingTasks =
+ new TreeSet<>(
+ (task1, task2) -> {
+ int statusDiff = task1.getStatus().ordinal() - task2.getStatus().ordinal();
+ if (statusDiff != 0) return statusDiff;
+ long startTimeDiff = task1.getStartTime() - task2.getStartTime();
+ if (startTimeDiff != 0) return startTimeDiff > 0 ? 1 : -1;
+ int storageGroupDiff = task1.getStorageGroup().compareTo(task2.getStorageGroup());
+ if (storageGroupDiff != 0) return storageGroupDiff;
+ long ttlDiff = task2.getTTL() - task1.getTTL();
+ return ttlDiff > 0 ? 1 : -1;
+ });
+ // the current largest ArchivingTask id + 1, used to create new tasks
+ private long currentTaskId = 0;
+ // endregion
+
+ // single thread to iterate through archivingTasks and check start
+ private ScheduledExecutorService archivingTaskCheckThread;
+ // multiple threads to run the tasks
+ private ExecutorService archivingTaskThreadPool;
+
+ private boolean initialized = false;
+ private static final long ARCHIVING_CHECK_INTERVAL = 60 * 1000L;
+
+ private static final File LOG_FILE =
+ SystemFileFactory.INSTANCE.getFile(
+ Paths.get(
+ FilePathUtils.regularizePath(config.getSystemDir()),
+ IoTDBConstant.ARCHIVING_FOLDER_NAME,
+ "log.bin")
+ .toString());
+ private static final File ARCHIVING_LOG_DIR =
+ SystemFileFactory.INSTANCE.getFile(
+ Paths.get(
+ FilePathUtils.regularizePath(config.getSystemDir()),
+ IoTDBConstant.ARCHIVING_FOLDER_NAME,
+ IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+ .toString());
+
+ // singleton
+ private static class ArchivingManagerHolder {
+ private ArchivingManagerHolder() {}
+
+ private static final ArchivingManager INSTANCE = new ArchivingManager();
+ }
+
+ public static ArchivingManager getInstance() {
+ return ArchivingManagerHolder.INSTANCE;
+ }
+
+ public void init() {
+ try {
+ lock.lock();
+
+ if (initialized) {
+ return;
+ }
+
+ // create necessary log files/dirs
+ if (ARCHIVING_LOG_DIR == null) logger.error("ARCHIVING_LOG_DIR is null");
+ if (!ARCHIVING_LOG_DIR.exists()) {
+ if (ARCHIVING_LOG_DIR.mkdirs()) {
+ logger.info("ARCHIVING_LOG_DIR {} created successfully", ARCHIVING_LOG_DIR);
+ } else {
+ logger.error("ARCHIVING_LOG_DIR {} create error", ARCHIVING_LOG_DIR);
+ }
+ }
+ if (!ARCHIVING_LOG_DIR.isDirectory())
+ logger.error("{} already exists but is not directory", ARCHIVING_LOG_DIR);
+ if (!LOG_FILE.getParentFile().exists()) LOG_FILE.getParentFile().mkdirs();
+ if (!LOG_FILE.exists()) {
+ try {
+ LOG_FILE.createNewFile();
+ } catch (IOException e) {
+ logger.error("{} log file could not be created", LOG_FILE.getName());
+ }
+ }
+
+ // recover
+ ArchivingRecover recover = new ArchivingRecover();
+ recover.recover();
+ this.archivingTasks.addAll(recover.getArchivingTasks());
+ this.currentTaskId = recover.getCurrentTaskId();
+
+ try {
+ logWriter = new ArchivingOperateWriter(LOG_FILE);
+ } catch (FileNotFoundException e) {
+ logger.error("Cannot find/create log for archiving.");
+ return;
+ }
+
+ archivingTaskCheckThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.ARCHIVING_CHECK.getName());
+ archivingTaskCheckThread.scheduleAtFixedRate(
+ this::checkArchivingTasks,
+ ARCHIVING_CHECK_INTERVAL,
+ ARCHIVING_CHECK_INTERVAL,
+ TimeUnit.MILLISECONDS);
+
+ archivingTaskThreadPool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ config.getArchivingThreadNum(), ThreadName.ARCHIVING_TASK.getName());
+ logger.info("start archiving check thread successfully.");
+ initialized = true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** close all resources used */
+ public void close() {
+ initialized = false;
+ archivingTaskCheckThread.shutdown();
+ archivingTaskThreadPool.shutdown();
+
+ try {
+ logWriter.close();
+ } catch (Exception e) {
+ logger.error("Cannot close archiving log writer, because:", e);
+ }
+
+ for (ArchivingTask task : archivingTasks) {
+ task.close();
+ }
+ archivingTasks.clear();
+ currentTaskId = 0;
+ }
+
+ /** creates a copy of archivingTasks and returns */
+ public List<ArchivingTask> getArchivingTasks() {
+ try {
+ lock.lock();
+ return new ArrayList<>(archivingTasks);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * add archiving task to archivingTasks
+ *
+ * @return true if set successful, false if exists duplicates or unsuccessful
+ */
+ public boolean setArchiving(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+ try {
+ lock.lock();
+
+ // check if there are duplicates
+ for (ArchivingTask archivingTask : archivingTasks) {
+ if (archivingTask.getStorageGroup().getFullPath().equals(storageGroup.getFullPath())
+ && archivingTask.getTargetDir().equals(targetDir)
+ && archivingTask.getTTL() == ttl
+ && archivingTask.getStartTime() == startTime) {
+ logger.info("archiving task already equals archiving task {}", archivingTask.getTaskId());
+ return false;
+ }
+ }
+
+ ArchivingTask newTask =
+ new ArchivingTask(currentTaskId, storageGroup, targetDir, ttl, startTime);
+ try {
+ logWriter.log(ArchivingOperate.ArchivingOperateType.SET, newTask);
+ } catch (IOException e) {
+ logger.error("write log error");
+ return false;
+ }
+ archivingTasks.add(newTask);
+ currentTaskId++;
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** @return the status after operate archivingOperateType */
+ private ArchivingTask.ArchivingTaskStatus statusFromOperateType(
+ ArchivingOperate.ArchivingOperateType archivingOperateType) {
+ switch (archivingOperateType) {
+ case RESUME:
+ return ArchivingTask.ArchivingTaskStatus.READY;
+ case CANCEL:
+ return ArchivingTask.ArchivingTaskStatus.CANCELED;
+ case START:
+ return ArchivingTask.ArchivingTaskStatus.RUNNING;
+ case PAUSE:
+ return ArchivingTask.ArchivingTaskStatus.PAUSED;
+ case FINISHED:
+ return ArchivingTask.ArchivingTaskStatus.FINISHED;
+ case ERROR:
+ return ArchivingTask.ArchivingTaskStatus.ERROR;
+ }
+ return null;
+ }
+
+ /**
+ * Operate on task (pause, cancel, resume, etc)
+ *
+ * @param archivingOperateType the operator on task
+ * @param taskId taskId of ArchivingTask to operate on
+ * @return true if exists task with taskId and operate successfully
+ */
+ public boolean operate(ArchivingOperate.ArchivingOperateType archivingOperateType, long taskId) {
+ try {
+ lock.lock();
+
+ ArchivingTask task = null;
+ // find matching task
+ for (ArchivingTask archivingTask : archivingTasks) {
+ if (archivingTask.getTaskId() == taskId) {
+ task = archivingTask;
+ break;
+ }
+ }
+ if (task == null) {
+ // no matches
+ return false;
+ }
+
+ return operate(archivingOperateType, task);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Operate on task (pause, cancel, resume, etc)
+ *
+ * @param archivingOperateType the operator on task
+ * @param storageGroup StorageGroup of ArchivingTask to operate on
+ * @return true if exists task with storageGroup and operate successfully
+ */
+ public boolean operate(
+ ArchivingOperate.ArchivingOperateType archivingOperateType, PartialPath storageGroup) {
+ try {
+ lock.lock();
+
+ ArchivingTask task = null;
+ // find matching task
+ for (ArchivingTask archivingTask : archivingTasks) {
+ if (archivingTask.getStorageGroup().getFullPath().equals(storageGroup.getFullPath())) {
+ task = archivingTask;
+ break;
+ }
+ }
+ if (task == null) {
+ // no matches
+ return false;
+ }
+
+ return operate(archivingOperateType, task);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean operate(
+ ArchivingOperate.ArchivingOperateType archivingOperateType, ArchivingTask task) {
+ // check if task has valid status
+ switch (archivingOperateType) {
+ case SET:
+ case START:
+ case FINISHED:
+ case ERROR:
+ return false;
+ case CANCEL:
+ case PAUSE:
+ // can cancel/pause only when status=READY/RUNNING
+ if (!(task.getStatus() == ArchivingTask.ArchivingTaskStatus.READY
+ || task.getStatus() == ArchivingTask.ArchivingTaskStatus.RUNNING)) {
+ return false;
+ }
+ break;
+ case RESUME:
+ // can resume only when status=PAUSED
+ if (!(task.getStatus() == ArchivingTask.ArchivingTaskStatus.PAUSED)) {
+ return false;
+ }
+ break;
+ }
+
+ // operate
+ switch (archivingOperateType) {
+ case PAUSE:
+ case CANCEL:
+ case RESUME:
+ // write to log
+ try {
+ logWriter.log(archivingOperateType, task);
+ } catch (IOException e) {
+ logger.error("write log error");
+ return false;
+ }
+ task.setStatus(statusFromOperateType(archivingOperateType));
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /** check if any of the archivingTasks can start */
+ public void checkArchivingTasks() {
+ try {
+ lock.lock();
+
+ logger.info("checking archivingTasks");
+ for (ArchivingTask task : archivingTasks) {
+
+ if (task.getStartTime() - DatetimeUtils.currentTime() <= 0
+ && task.getStatus() == ArchivingTask.ArchivingTaskStatus.READY) {
+
+ // storage group has no data
+ if (!StorageEngine.getInstance().getProcessorMap().containsKey(task.getStorageGroup())) {
+ return;
+ }
+
+ // set task to running
+ task.setStatus(ArchivingTask.ArchivingTaskStatus.RUNNING);
+
+ // push check archivingTask to storageGroupManager, use Runnable to give task to thread
+ // pool
+ archivingTaskThreadPool.execute(
+ () -> {
+ try {
+ logWriter.log(ArchivingOperate.ArchivingOperateType.START, task);
+ task.startTask();
+ } catch (IOException e) {
+ logger.error("write log error");
+ task.setStatus(ArchivingTask.ArchivingTaskStatus.ERROR);
+ return;
+ }
+
+ StorageEngine.getInstance()
+ .getProcessorMap()
+ .get(task.getStorageGroup())
+ .checkArchivingTask(task);
+ logger.info("check archiving task successfully.");
+
+ // set state and remove
+ try {
+ logWriter.log(ArchivingOperate.ArchivingOperateType.FINISHED, task);
+ task.finish();
+ } catch (IOException e) {
+ logger.error("write log error");
+ task.setStatus(ArchivingTask.ArchivingTaskStatus.ERROR);
+ return;
+ }
+ task.setStatus(ArchivingTask.ArchivingTaskStatus.FINISHED);
+ });
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // test
+ public void setCheckThreadTime(long checkThreadTime) {
+ archivingTaskCheckThread.shutdown();
+
+ archivingTaskCheckThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.ARCHIVING_CHECK.getName());
+ archivingTaskCheckThread.scheduleAtFixedRate(
+ this::checkArchivingTasks, checkThreadTime, checkThreadTime, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperate.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperate.java
new file mode 100644
index 0000000000..cbe91e82e4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperate.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** ArchivingOperate is ArchivingOperateType (SET, CANCEL, PAUSE, etc) and an ArchivingTask */
+public class ArchivingOperate {
+ private ArchivingOperateType type;
+ private ArchivingTask task;
+
+ public ArchivingOperate(ArchivingOperateType type, ArchivingTask task) {
+ this.type = type;
+ this.task = new ArchivingTask(task);
+ }
+
+ public ArchivingOperate(ArchivingOperateType type, long taskId) {
+ this.type = type;
+ this.task = new ArchivingTask(taskId);
+ }
+
+ public ArchivingOperateType getType() {
+ return type;
+ }
+
+ public ArchivingTask getTask() {
+ return task;
+ }
+
+ public void serialize(FileOutputStream logFileOutStream) throws IOException {
+ int typeNum = type.ordinal();
+ ReadWriteIOUtils.write((byte) typeNum, logFileOutStream);
+ ReadWriteIOUtils.write(task.getTaskId(), logFileOutStream);
+
+ if (type == ArchivingOperateType.SET) {
+ ReadWriteIOUtils.write(task.getStorageGroup().getFullPath(), logFileOutStream);
+ ReadWriteIOUtils.write(task.getTargetDir().getPath(), logFileOutStream);
+ ReadWriteIOUtils.write(task.getStartTime(), logFileOutStream);
+ ReadWriteIOUtils.write(task.getTTL(), logFileOutStream);
+ ReadWriteIOUtils.write(task.getSubmitTime(), logFileOutStream);
+ }
+
+ logFileOutStream.flush();
+ }
+
+ public static ArchivingOperate deserialize(FileInputStream logFileInStream)
+ throws IOException, IllegalPathException {
+ ArchivingOperateType operateType;
+
+ int typeNum = ReadWriteIOUtils.readByte(logFileInStream);
+ if (typeNum >= 0 && typeNum < ArchivingOperateType.values().length)
+ operateType = ArchivingOperateType.values()[typeNum];
+ else throw new IOException();
+
+ long taskId = ReadWriteIOUtils.readLong(logFileInStream);
+
+ ArchivingTask deserializedTask;
+ if (operateType == ArchivingOperateType.SET) {
+ PartialPath storageGroup = new PartialPath(ReadWriteIOUtils.readString(logFileInStream));
+ String targetDirPath = ReadWriteIOUtils.readString(logFileInStream);
+ File targetDir = FSFactoryProducer.getFSFactory().getFile(targetDirPath);
+ long startTime = ReadWriteIOUtils.readLong(logFileInStream);
+ long ttl = ReadWriteIOUtils.readLong(logFileInStream);
+ long submitTime = ReadWriteIOUtils.readLong(logFileInStream);
+
+ deserializedTask =
+ new ArchivingTask(taskId, storageGroup, targetDir, ttl, startTime, submitTime);
+ } else {
+ deserializedTask = new ArchivingTask(taskId);
+ }
+ return new ArchivingOperate(operateType, deserializedTask);
+ }
+
+ public enum ArchivingOperateType {
+ SET,
+ CANCEL,
+ START,
+ PAUSE,
+ RESUME,
+ FINISHED,
+ ERROR
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateReader.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateReader.java
new file mode 100644
index 0000000000..afa122627e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+/**
+ * ArchivingOperateReader reads binarized ArchivingOperate from file using FileInputStream from head
+ * to tail.
+ */
+public class ArchivingOperateReader implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(ArchivingOperateReader.class);
+ private final File logFile;
+ private FileInputStream logFileInStream;
+ private ArchivingOperate operate;
+ private long unbrokenLogsSize = 0;
+
+ public ArchivingOperateReader(String logFilePath) throws IOException {
+ this.logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+ logFileInStream = new FileInputStream(logFile);
+ }
+
+ public ArchivingOperateReader(File logFile) throws IOException {
+ this.logFile = logFile;
+ logFileInStream = new FileInputStream(logFile);
+ }
+
+ /** @return ArchivingOperate parsed from log file, null if nothing left in file */
+ private ArchivingOperate readOperate() {
+ try {
+ ArchivingOperate log = ArchivingOperate.deserialize(logFileInStream);
+
+ unbrokenLogsSize = logFileInStream.getChannel().position();
+ return log;
+ } catch (IllegalPathException | IOException e) {
+ return null;
+ }
+ }
+
+ public ArchivingOperate next() {
+ ArchivingOperate ret = operate;
+ operate = null;
+ return ret;
+ }
+
+ public boolean hasNext() {
+ if (operate != null) {
+ return true;
+ }
+
+ // try reading
+ operate = readOperate();
+
+ if (operate == null) {
+ truncateBrokenLogs();
+ operate = null;
+ return false;
+ }
+ return true;
+ }
+
+ /** Keeps 0...unbrokenLogSize bytes of the Log File and discards the rest */
+ private void truncateBrokenLogs() {
+ try (FileOutputStream outputStream = new FileOutputStream(logFile, true);
+ FileChannel channel = outputStream.getChannel()) {
+ channel.truncate(unbrokenLogsSize);
+ } catch (IOException e) {
+ logger.error("Fail to truncate log file to size {}", unbrokenLogsSize, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ logFileInStream.close();
+ } catch (IOException e) {
+ logger.error("Failed to close archiving log");
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriter.java
new file mode 100644
index 0000000000..01d4ccd60b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.logfile.MLogTxtWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * ArchivingOperateWriter writes the binary logs of ArchivingOperate into file using
+ * FileOutputStream
+ */
+public class ArchivingOperateWriter implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(MLogTxtWriter.class);
+ private final File logFile;
+ private FileOutputStream logFileOutStream;
+
+ public ArchivingOperateWriter(String logFileName) throws FileNotFoundException {
+ this(SystemFileFactory.INSTANCE.getFile(logFileName));
+ }
+
+ public ArchivingOperateWriter(File logFile) throws FileNotFoundException {
+ this.logFile = logFile;
+ if (!logFile.exists()) {
+ if (logFile.getParentFile() != null) {
+ if (logFile.getParentFile().mkdirs()) {
+ logger.info("created archiving log folder");
+ } else {
+ logger.info("create archiving log folder failed");
+ }
+ }
+ }
+ logFileOutStream = new FileOutputStream(logFile, true);
+ }
+
+ public void log(ArchivingOperate.ArchivingOperateType type, ArchivingTask task)
+ throws IOException {
+ ArchivingOperate operate;
+ switch (type) {
+ case SET:
+ operate = new ArchivingOperate(ArchivingOperate.ArchivingOperateType.SET, task);
+ operate.serialize(logFileOutStream);
+ break;
+ case CANCEL:
+ case START:
+ case PAUSE:
+ case RESUME:
+ case FINISHED:
+ case ERROR:
+ operate = new ArchivingOperate(type, task.getTaskId());
+ operate.serialize(logFileOutStream);
+ break;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ logFileOutStream.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingRecover.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingRecover.java
new file mode 100644
index 0000000000..5391e4e7cf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingRecover.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * ArchivingRecover class encapsulates the recover logic, it retrieves the ArchivingTasks and
+ * finishes archiving the tsfiles.
+ */
+public class ArchivingRecover {
+ private static final Logger logger = LoggerFactory.getLogger(ArchivingRecover.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final File LOG_FILE =
+ SystemFileFactory.INSTANCE.getFile(
+ Paths.get(
+ FilePathUtils.regularizePath(config.getSystemDir()),
+ IoTDBConstant.ARCHIVING_FOLDER_NAME,
+ "log.bin")
+ .toString());
+ private static final File ARCHIVING_LOG_DIR =
+ SystemFileFactory.INSTANCE.getFile(
+ Paths.get(
+ FilePathUtils.regularizePath(config.getSystemDir()),
+ IoTDBConstant.ARCHIVING_FOLDER_NAME,
+ IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+ .toString());
+ private Map<Long, ArchivingTask> archivingTasks;
+ private long currentTaskId = 0;
+
+ public List<ArchivingTask> recover() {
+ // first recover archiving tsfiles
+ recoverArchivingFiles();
+
+ archivingTasks = new HashMap<>();
+ currentTaskId = 0;
+
+ // read from logReader
+ try (ArchivingOperateReader logReader = new ArchivingOperateReader(LOG_FILE);
+ ArchivingOperateWriter logWriter = new ArchivingOperateWriter(LOG_FILE)) {
+ Set<Long> errorSet = new HashSet<>();
+
+ while (logReader.hasNext()) {
+ ArchivingOperate operate = logReader.next();
+ long taskId = operate.getTask().getTaskId();
+
+ switch (operate.getType()) {
+ case SET:
+ setArchivingFromLog(operate.getTask());
+ break;
+ case CANCEL:
+ operateFromLog(ArchivingOperate.ArchivingOperateType.CANCEL, taskId);
+ break;
+ case START:
+ // if task started but didn't finish, then error occurred
+ errorSet.add(taskId);
+ break;
+ case PAUSE:
+ errorSet.remove(taskId);
+ operateFromLog(ArchivingOperate.ArchivingOperateType.PAUSE, taskId);
+ break;
+ case RESUME:
+ operateFromLog(ArchivingOperate.ArchivingOperateType.RESUME, taskId);
+ break;
+ case FINISHED:
+ // finished task => remove from list and remove from potential error task
+ errorSet.remove(taskId);
+ archivingTasks.remove(taskId);
+ operateFromLog(ArchivingOperate.ArchivingOperateType.FINISHED, taskId);
+ break;
+ case ERROR:
+ // already put error in log
+ errorSet.remove(taskId);
+ operateFromLog(ArchivingOperate.ArchivingOperateType.ERROR, taskId);
+ break;
+ default:
+ logger.error("read archiving log: unknown type");
+ }
+ }
+
+ // for each task in errorSet, the task started but didn't finish (an error)
+ for (long errTaskId : errorSet) {
+ if (archivingTasks.containsKey(errTaskId)) {
+ // write to log and set task in ERROR in memory
+ logWriter.log(ArchivingOperate.ArchivingOperateType.ERROR, archivingTasks.get(errTaskId));
+ operateFromLog(ArchivingOperate.ArchivingOperateType.ERROR, errTaskId);
+ } else {
+ logger.error("unknown error taskId");
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Cannot read log for archiving.");
+ }
+
+ recoverArchivingFiles();
+
+ return new ArrayList<>(archivingTasks.values());
+ }
+
+ /** add archiving task to archivingTasks from log, does not write to log */
+ public void setArchivingFromLog(ArchivingTask newTask) {
+ if (currentTaskId > newTask.getTaskId()) {
+ logger.error("set archiving error, current index larger than log index");
+ }
+
+ archivingTasks.put(newTask.getTaskId(), newTask);
+ currentTaskId = newTask.getTaskId() + 1;
+ }
+
+ private ArchivingTask.ArchivingTaskStatus statusFromOperateType(
+ ArchivingOperate.ArchivingOperateType archivingOperateType) {
+ switch (archivingOperateType) {
+ case RESUME:
+ return ArchivingTask.ArchivingTaskStatus.READY;
+ case CANCEL:
+ return ArchivingTask.ArchivingTaskStatus.CANCELED;
+ case START:
+ return ArchivingTask.ArchivingTaskStatus.RUNNING;
+ case PAUSE:
+ return ArchivingTask.ArchivingTaskStatus.PAUSED;
+ case FINISHED:
+ return ArchivingTask.ArchivingTaskStatus.FINISHED;
+ case ERROR:
+ return ArchivingTask.ArchivingTaskStatus.ERROR;
+ }
+ return null;
+ }
+
+ /** operate Archiving to archivingTasks from log, does not write to log */
+ public boolean operateFromLog(ArchivingOperate.ArchivingOperateType operateType, long taskId) {
+ if (archivingTasks.containsKey(taskId)) {
+ archivingTasks.get(taskId).setStatus(statusFromOperateType(operateType));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /** finish the unfinished ArchivingTask using log files under ARCHIVING_LOG_DIR */
+ public void recoverArchivingFiles() {
+ File[] archivingLogFiles = ARCHIVING_LOG_DIR.listFiles();
+ for (File logFile : archivingLogFiles) {
+ FileInputStream logFileInput;
+ File targetDir;
+ String tsfilePath;
+ File tsfile;
+
+ try {
+ logFileInput = new FileInputStream(logFile);
+ String targetDirPath = ReadWriteIOUtils.readString(logFileInput);
+
+ targetDir = SystemFileFactory.INSTANCE.getFile(targetDirPath);
+ tsfilePath = ReadWriteIOUtils.readString(logFileInput);
+
+ if (targetDir.exists()) {
+ if (!targetDir.isDirectory()) {
+ logger.error("target dir {} not a directory", targetDirPath);
+ continue;
+ }
+ } else if (!targetDir.mkdirs()) {
+ logger.error("create target dir {} failed", targetDirPath);
+ continue;
+ }
+ } catch (IOException e) {
+ // could not read log file, continue to next log
+ logger.error("ArchivingRecover: log file not found");
+ continue;
+ }
+
+ while (tsfilePath != null && !tsfilePath.isEmpty()) {
+ tsfile = SystemFileFactory.INSTANCE.getFile(tsfilePath);
+
+ TsFileResource resource = new TsFileResource(tsfile);
+ resource.archive(targetDir);
+
+ try {
+ tsfilePath = ReadWriteIOUtils.readString(logFileInput);
+ } catch (IOException e) {
+ // finished reading all tsfile paths
+ break;
+ }
+ }
+
+ try {
+ logFileInput.close();
+ } catch (IOException e) {
+ logger.error("Archiving Log File Error", e);
+ break;
+ }
+
+ logFile.delete();
+ }
+ }
+
+ public List<ArchivingTask> getArchivingTasks() {
+ return new ArrayList<>(archivingTasks.values());
+ }
+
+ public long getCurrentTaskId() {
+ return currentTaskId;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
new file mode 100644
index 0000000000..4973e3bbb1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+
+/** Class for each Archiving Task */
+public class ArchivingTask {
+ private long taskId;
+ private PartialPath storageGroup;
+ private File targetDir;
+ private long startTime;
+ private long ttl;
+ private volatile ArchivingTaskStatus status = ArchivingTaskStatus.READY;
+ private long submitTime;
+
+ private static final Logger logger = LoggerFactory.getLogger(ArchivingTask.class);
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ FileOutputStream logFileOutput = null;
+ private static final File ARCHIVING_LOG_DIR =
+ SystemFileFactory.INSTANCE.getFile(
+ Paths.get(
+ FilePathUtils.regularizePath(config.getSystemDir()),
+ IoTDBConstant.ARCHIVING_FOLDER_NAME,
+ IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+ .toString());
+
+ public ArchivingTask(long taskId) {
+ this.taskId = taskId;
+ }
+
+ public ArchivingTask(ArchivingTask task) {
+ this.taskId = task.getTaskId();
+ this.storageGroup = task.getStorageGroup();
+ this.targetDir = task.getTargetDir();
+ this.ttl = task.getTTL();
+ this.startTime = task.getStartTime();
+ this.submitTime = task.getSubmitTime();
+ }
+
+ public ArchivingTask(
+ long taskId, PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+ this.taskId = taskId;
+ this.storageGroup = storageGroup;
+ this.targetDir = targetDir;
+ this.ttl = ttl;
+ this.startTime = startTime;
+ this.submitTime = DatetimeUtils.currentTime();
+ }
+
+ public ArchivingTask(
+ long taskId,
+ PartialPath storageGroup,
+ File targetDir,
+ long ttl,
+ long startTime,
+ long submitTime) {
+ this.taskId = taskId;
+ this.storageGroup = storageGroup;
+ this.targetDir = targetDir;
+ this.ttl = ttl;
+ this.startTime = startTime;
+ this.submitTime = submitTime;
+ }
+
+ /**
+ * started the archiving task, write to log
+ *
+ * @return true if write log successful, false otherwise
+ */
+ public boolean startTask() throws IOException {
+ File logFile = SystemFileFactory.INSTANCE.getFile(ARCHIVING_LOG_DIR, taskId + ".log");
+ if (logFile.exists()) {
+ // want an empty log file
+ logFile.delete();
+ }
+ if (!logFile.createNewFile()) {
+ // log file doesn't exist but cannot be created
+ return false;
+ }
+
+ logFileOutput = new FileOutputStream(logFile);
+
+ ReadWriteIOUtils.write(targetDir.getAbsolutePath(), logFileOutput);
+ logFileOutput.flush();
+
+ return true;
+ }
+
+ /**
+ * started archiving tsfile and its resource/mod files
+ *
+ * @return true if write log successful, false otherwise
+ */
+ public boolean startFile(File tsfile) throws IOException {
+ if (logFileOutput == null) {
+ logger.error("need to run ArchivingTask.startTask before ArchivingTask.start");
+ return false;
+ }
+
+ ReadWriteIOUtils.write(tsfile.getAbsolutePath(), logFileOutput);
+ logFileOutput.flush();
+
+ return true;
+ }
+
+ /** finished archiving task, deletes logs and closes FileOutputStream */
+ public void finish() {
+ File logFile = SystemFileFactory.INSTANCE.getFile(ARCHIVING_LOG_DIR, taskId + ".log");
+ if (logFile.exists()) {
+ logFile.delete();
+ }
+ this.close();
+ }
+
+ /** release all resources */
+ public void close() {
+ try {
+ if (logFileOutput != null) {
+ logFileOutput.close();
+ logFileOutput = null;
+ }
+ } catch (IOException e) {
+ logger.error("could not close fileoutputstream for task {}", taskId);
+ }
+ }
+
+ // getter and setter functions
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public PartialPath getStorageGroup() {
+ return storageGroup;
+ }
+
+ public File getTargetDir() {
+ return targetDir;
+ }
+
+ public long getTTL() {
+ return ttl;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public ArchivingTaskStatus getStatus() {
+ return status;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public void setStatus(ArchivingTaskStatus status) {
+ this.status = status;
+ }
+
+ public enum ArchivingTaskStatus {
+ READY,
+ RUNNING,
+ PAUSED,
+ CANCELED,
+ ERROR,
+ FINISHED
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index bce000b73d..6a991d2446 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -535,6 +535,35 @@ public class TsFileResource {
return true;
}
+ /**
+ * Move its data file, resource file, and modification file physically.
+ *
+ * @return moved data file
+ */
+ public File archive(File targetDir) {
+ // get the resource and mod files
+ File resourceFile = fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX);
+ File modFile = fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX);
+
+ // get the target file locations
+ File archivedFile = fsFactory.getFile(targetDir, file.getName());
+ File archivedResourceFile = fsFactory.getFile(targetDir, file.getName() + RESOURCE_SUFFIX);
+ File archivedModificationFile =
+ fsFactory.getFile(targetDir, file.getName() + ModificationFile.FILE_SUFFIX);
+
+ // move
+ if (file.exists()) {
+ fsFactory.moveFile(file, archivedFile);
+ }
+ if (resourceFile.exists()) {
+ fsFactory.moveFile(resourceFile, archivedResourceFile);
+ }
+ if (modFile.exists()) {
+ fsFactory.moveFile(modFile, archivedModificationFile);
+ }
+ return archivedFile;
+ }
+
void moveTo(File targetDir) {
fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName()));
fsFactory.moveFile(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
index 42eaf481d3..3c7a0d960d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
@@ -23,5 +23,6 @@ public enum TsFileResourceStatus {
CLOSED,
COMPACTION_CANDIDATE,
COMPACTING,
- DELETED
+ DELETED,
+ ARCHIVED
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index e0bac29972..b67bcae731 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.SystemStatus;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingTask;
import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverManager;
@@ -1550,6 +1551,80 @@ public class VirtualStorageGroupProcessor {
}
}
+ /** iterate over TsFiles and move to targetDir if out of ttl */
+ public void checkArchivingTask(ArchivingTask task) {
+ if (task.getTTL() == Long.MAX_VALUE) {
+ logger.debug(
+ "{}: Archiving ttl not set, ignore the check",
+ logicalStorageGroupName + "-" + virtualStorageGroupId);
+ return;
+ }
+ long ttlLowerBound = System.currentTimeMillis() - task.getTTL();
+ logger.debug(
+ "{}: Archiving files before {}",
+ logicalStorageGroupName + "-" + virtualStorageGroupId,
+ new Date(ttlLowerBound));
+
+ // copy to avoid concurrent modification of deletion
+ List<TsFileResource> seqFiles = new ArrayList<>(tsFileManager.getTsFileList(true));
+ List<TsFileResource> unseqFiles = new ArrayList<>(tsFileManager.getTsFileList(false));
+
+ for (TsFileResource tsFileResource : seqFiles) {
+ if (task.getStatus() != ArchivingTask.ArchivingTaskStatus.RUNNING) {
+ // task stopped running (eg. the task is paused), return
+ return;
+ }
+ checkArchivingTaskFile(task, tsFileResource, task.getTargetDir(), ttlLowerBound, true);
+ }
+
+ for (TsFileResource tsFileResource : unseqFiles) {
+ if (task.getStatus() != ArchivingTask.ArchivingTaskStatus.RUNNING) {
+ // task stopped running, return
+ return;
+ }
+ checkArchivingTaskFile(task, tsFileResource, task.getTargetDir(), ttlLowerBound, false);
+ }
+ }
+
+ /** archive the file to targetDir */
+ public void checkArchivingTaskFile(
+ ArchivingTask task,
+ TsFileResource resource,
+ File targetDir,
+ long ttlLowerBound,
+ boolean isSeq) {
+ writeLock("checkArchivingLock");
+ try {
+ if (!resource.isClosed() || !resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
+ return;
+ }
+
+ resource.setStatus(TsFileResourceStatus.ARCHIVED);
+
+ // ensure that the file is not used by any queries
+ if (resource.tryWriteLock()) {
+ try {
+ // try to archive physical data file
+ tsFileManager.remove(resource, isSeq);
+
+ // start archiving file
+ if (task.startFile(resource.getTsFile())) {
+ File archivedFile = resource.archive(targetDir);
+ } else {
+ // archive file couldn't start
+ logger.error("{} archiving logger error", resource.getTsFilePath());
+ }
+ } catch (IOException e) {
+ logger.error("{} archiving error", resource.getTsFilePath());
+ } finally {
+ resource.writeUnlock();
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
public void timedFlushUnseqMemTable() {
writeLock("timedFlushUnseqMemTable");
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index fb864ac89f..1d41f41fba 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.storagegroup.virtualSg;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
@@ -119,6 +120,21 @@ public class StorageGroupManager {
}
}
+ /** push check archiving to all virtual storage group processors */
+ public void checkArchivingTask(ArchivingTask task) {
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (task.getStatus() != ArchivingTask.ArchivingTaskStatus.RUNNING) {
+ // task stopped running (eg. the task is paused), return
+ return;
+ }
+
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.checkArchivingTask(task);
+ }
+ }
+ }
+
/** push check sequence memtable flush interval down to all sg */
public void timedFlushSeqMemTable() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 8c26ec903c..348773cec9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.SystemStatus;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingOperate;
+import org.apache.iotdb.db.engine.archiving.ArchivingTask;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
@@ -107,12 +109,15 @@ import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SettlePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowArchivingPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
@@ -128,6 +133,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -179,6 +185,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -193,6 +200,9 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ARCHIVING_START_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ARCHIVING_STATUS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ARCHIVING_TARGET_DIRECTORY;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
@@ -204,6 +214,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUE
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_EXPIRE_TIME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
@@ -213,6 +224,8 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_SCHEMA_TEMPLATE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_SUBMIT_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_ID;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
@@ -408,6 +421,12 @@ public class PlanExecutor implements IPlanExecutor {
return true;
case SHOW_QUERY_RESOURCE:
return processShowQueryResource();
+ case SET_ARCHIVING:
+ operateSetArchiving((SetArchivingPlan) plan);
+ return true;
+ case PAUSE_ARCHIVING:
+ operatePauseArchiving((PauseArchivingPlan) plan);
+ return true;
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorName()));
@@ -760,6 +779,8 @@ public class PlanExecutor implements IPlanExecutor {
return processShowPathsSetSchemaTemplate((ShowPathsSetTemplatePlan) showPlan);
case PATHS_USING_SCHEMA_TEMPLATE:
return processShowPathsUsingSchemaTemplate((ShowPathsUsingTemplatePlan) showPlan);
+ case SHOW_ARCHIVING:
+ return processShowArchiving((ShowArchivingPlan) showPlan);
default:
throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
}
@@ -1261,6 +1282,83 @@ public class PlanExecutor implements IPlanExecutor {
return TriggerRegistrationService.getInstance().show();
}
+ private QueryDataSet processShowArchiving(ShowArchivingPlan showArchivingPlan) {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_TASK_ID, false),
+ new PartialPath(COLUMN_SUBMIT_TIME, false),
+ new PartialPath(COLUMN_STORAGE_GROUP, false),
+ new PartialPath(COLUMN_ARCHIVING_STATUS, false),
+ new PartialPath(COLUMN_ARCHIVING_START_TIME, false),
+ new PartialPath(COLUMN_EXPIRE_TIME, false),
+ new PartialPath(COLUMN_ARCHIVING_TARGET_DIRECTORY, false)),
+ Arrays.asList(
+ TSDataType.INT64,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.TEXT,
+ TSDataType.INT64,
+ TSDataType.TEXT));
+ Set<PartialPath> selectedSgs = new HashSet<>(showArchivingPlan.getStorageGroups());
+
+ List<ArchivingTask> archivingTaskList =
+ StorageEngine.getInstance().getArchivingManager().getArchivingTasks();
+ int timestamp = 0;
+ for (ArchivingTask task : archivingTaskList) {
+ PartialPath sgName = task.getStorageGroup();
+ if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
+ continue;
+ }
+ RowRecord rowRecord = new RowRecord(timestamp++);
+ Field taskId = new Field(TSDataType.INT64);
+ Field submitTime = new Field(TSDataType.TEXT);
+ Field sg = new Field(TSDataType.TEXT);
+ Field status = new Field(TSDataType.TEXT);
+ Field startTime;
+ Field ttl;
+ Field targetDir = new Field(TSDataType.TEXT);
+
+ // set values for Fields based on tasks
+ taskId.setLongV(task.getTaskId());
+ sg.setBinaryV(new Binary(sgName.getFullPath()));
+ status.setBinaryV(new Binary(task.getStatus().name()));
+ targetDir.setBinaryV(new Binary(task.getTargetDir().getPath()));
+ if (task.getTTL() != Long.MAX_VALUE) {
+ ttl = new Field(TSDataType.INT64);
+ ttl.setLongV(task.getTTL());
+ } else {
+ ttl = null;
+ }
+ ZonedDateTime submitDateTime =
+ DatetimeUtils.convertMillsecondToZonedDateTime(task.getSubmitTime());
+ String submitTimeStr = DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(submitDateTime);
+ submitTime.setBinaryV(new Binary(submitTimeStr));
+ if (task.getStartTime() != Long.MAX_VALUE) {
+ ZonedDateTime startDate =
+ DatetimeUtils.convertMillsecondToZonedDateTime(task.getStartTime());
+ String startTimeStr = DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(startDate);
+ startTime = new Field(TSDataType.TEXT);
+ startTime.setBinaryV(new Binary(startTimeStr));
+ } else {
+ startTime = null;
+ }
+
+ // add to rowRecord
+ rowRecord.addField(taskId);
+ rowRecord.addField(submitTime);
+ rowRecord.addField(sg);
+ rowRecord.addField(status);
+ rowRecord.addField(startTime);
+ rowRecord.addField(ttl);
+ rowRecord.addField(targetDir);
+ listDataSet.putRecord(rowRecord);
+ }
+
+ return listDataSet;
+ }
+
private void addRowRecordForShowQuery(
ListDataSet listDataSet, int timestamp, String item, String value) {
RowRecord rowRecord = new RowRecord(timestamp);
@@ -1598,6 +1696,44 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ private void operateSetArchiving(SetArchivingPlan plan) throws QueryProcessException {
+ if (plan.getTargetDir() == null) {
+ // is cancel plan
+ StorageEngine.getInstance()
+ .operateArchiving(
+ ArchivingOperate.ArchivingOperateType.CANCEL,
+ plan.getTaskId(),
+ plan.getStorageGroup());
+ } else {
+ try {
+ List<PartialPath> storageGroupPaths =
+ IoTDB.metaManager.getMatchedStorageGroups(plan.getStorageGroup(), plan.isPrefixMatch());
+ for (PartialPath storagePath : storageGroupPaths) {
+ StorageEngine.getInstance()
+ .setArchiving(storagePath, plan.getTargetDir(), plan.getTTL(), plan.getStartTime());
+ }
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ }
+ }
+
+ private void operatePauseArchiving(PauseArchivingPlan plan) {
+ if (plan.isPause()) {
+ StorageEngine.getInstance()
+ .operateArchiving(
+ ArchivingOperate.ArchivingOperateType.PAUSE,
+ plan.getTaskId(),
+ plan.getStorageGroup());
+ } else {
+ StorageEngine.getInstance()
+ .operateArchiving(
+ ArchivingOperate.ArchivingOperateType.RESUME,
+ plan.getTaskId(),
+ plan.getStorageGroup());
+ }
+ }
+
@Override
public void update(PartialPath path, long startTime, long endTime, String value) {
throw new UnsupportedOperationException("update is not supported now");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 677eadcb51..5708665b83 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -196,6 +196,10 @@ public abstract class Operator {
SHOW_QUERY_RESOURCE,
- DEACTIVATE_TEMPLATE
+ DEACTIVATE_TEMPLATE,
+
+ SET_ARCHIVING,
+ PAUSE_ARCHIVING,
+ SHOW_ARCHIVING
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CancelArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CancelArchivingOperator.java
new file mode 100644
index 0000000000..fde4e7bc39
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CancelArchivingOperator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class CancelArchivingOperator extends Operator {
+ private PartialPath storageGroup = null;
+
+ private long taskId = -1;
+
+ public CancelArchivingOperator(int tokenIntType) {
+ super(tokenIntType);
+ this.operatorType = OperatorType.SET_ARCHIVING;
+ }
+
+ public PartialPath getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setStorageGroup(PartialPath storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(long taskId) {
+ this.taskId = taskId;
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+ throws QueryProcessException {
+ if (storageGroup != null) {
+ return new SetArchivingPlan(storageGroup);
+ } else if (taskId != -1) {
+ return new SetArchivingPlan(taskId);
+ } else {
+ return new SetArchivingPlan();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/PauseArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/PauseArchivingOperator.java
new file mode 100644
index 0000000000..a55d010e28
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/PauseArchivingOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class PauseArchivingOperator extends Operator {
+ private long taskId = -1;
+ private PartialPath storageGroup;
+
+ public PauseArchivingOperator(int tokenIntType) {
+ super(tokenIntType);
+ this.operatorType = OperatorType.PAUSE_ARCHIVING;
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public PartialPath getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setTaskId(long taskId) {
+ this.taskId = taskId;
+ }
+
+ public void setStorageGroup(PartialPath storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+ throws QueryProcessException {
+ if (storageGroup != null) {
+ return new PauseArchivingPlan(storageGroup, true);
+ } else if (taskId != -1) {
+ return new PauseArchivingPlan(taskId, true);
+ } else {
+ return new PauseArchivingPlan(true);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ResumeArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ResumeArchivingOperator.java
new file mode 100644
index 0000000000..9bf53bf237
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ResumeArchivingOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class ResumeArchivingOperator extends Operator {
+ private long taskId = -1;
+ private PartialPath storageGroup;
+
+ public ResumeArchivingOperator(int tokenIntType) {
+ super(tokenIntType);
+ this.operatorType = OperatorType.PAUSE_ARCHIVING;
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public PartialPath getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setTaskId(long taskId) {
+ this.taskId = taskId;
+ }
+
+ public void setStorageGroup(PartialPath storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+ throws QueryProcessException {
+ if (storageGroup != null) {
+ return new PauseArchivingPlan(storageGroup, false);
+ } else if (taskId != -1) {
+ return new PauseArchivingPlan(taskId, false);
+ } else {
+ return new PauseArchivingPlan(false);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetArchivingOperator.java
new file mode 100644
index 0000000000..0a2b482037
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetArchivingOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+import java.io.File;
+
+public class SetArchivingOperator extends Operator {
+ private PartialPath storageGroup = null;
+ private File targetDir = null;
+ private Long ttl = null;
+ private Long startTime = null;
+
+ public SetArchivingOperator(int tokenIntType) {
+ super(tokenIntType);
+ this.operatorType = OperatorType.SET_ARCHIVING;
+ }
+
+ public PartialPath getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setStorageGroup(PartialPath storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ public File getTargetDir() {
+ return targetDir;
+ }
+
+ public void setTargetDir(File targetDir) {
+ this.targetDir = targetDir;
+ }
+
+ public long getTTL() {
+ return ttl;
+ }
+
+ public void setTTL(long ttl) {
+ this.ttl = ttl;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+ throws QueryProcessException {
+ if (storageGroup == null) {
+ throw new SQLParserException("storage_group not specified");
+ }
+ if (startTime == null) {
+ throw new SQLParserException("start_time not specified");
+ }
+ if (ttl == null) {
+ throw new SQLParserException("ttl not specified");
+ }
+ if (targetDir == null) {
+ throw new SQLParserException("target_dir not specified");
+ }
+
+ return new SetArchivingPlan(storageGroup, targetDir, ttl, startTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
new file mode 100644
index 0000000000..575be7c063
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowArchivingOperator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowArchivingPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+import java.util.List;
+
+public class ShowArchivingOperator extends ShowOperator {
+ private List<PartialPath> storageGroups;
+
+ public ShowArchivingOperator(List<PartialPath> storageGroups) {
+ super(SQLConstant.TOK_SHOW, OperatorType.SHOW_ARCHIVING);
+ this.storageGroups = storageGroups;
+ }
+
+ public List<PartialPath> getStorageGroups() {
+ return storageGroups;
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator) {
+ return new ShowArchivingPlan(storageGroups);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 2857c45752..443561c703 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -562,7 +562,8 @@ public abstract class PhysicalPlan {
APPEND_TEMPLATE,
PRUNE_TEMPLATE,
DROP_TEMPLATE,
- DEACTIVATE_TEMPLATE
+ DEACTIVATE_TEMPLATE,
+ ARCHIVING
}
public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PauseArchivingPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PauseArchivingPlan.java
new file mode 100644
index 0000000000..cdbc63ca7a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PauseArchivingPlan.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PauseArchivingPlan extends PhysicalPlan {
+ private long taskId = -1;
+ private PartialPath storageGroup;
+ private boolean pause = true;
+
+ public PauseArchivingPlan(boolean pause) {
+ super(Operator.OperatorType.PAUSE_ARCHIVING);
+ this.pause = pause;
+ }
+
+ public PauseArchivingPlan(PartialPath storageGroup, boolean pause) {
+ super(Operator.OperatorType.PAUSE_ARCHIVING);
+ this.storageGroup = storageGroup;
+ this.pause = pause;
+ }
+
+ public PauseArchivingPlan(long taskId, boolean pause) {
+ super(Operator.OperatorType.PAUSE_ARCHIVING);
+ this.taskId = taskId;
+ this.pause = pause;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return null;
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public boolean isPause() {
+ return pause;
+ }
+
+ public PartialPath getStorageGroup() {
+ return storageGroup;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetArchivingPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetArchivingPlan.java
new file mode 100644
index 0000000000..b686a452df
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetArchivingPlan.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class SetArchivingPlan extends PhysicalPlan {
+ private long taskId = -1;
+ private PartialPath storageGroup;
+ private File targetDir;
+ private long ttl;
+ private long startTime;
+
+ public SetArchivingPlan() {
+ super(OperatorType.SET_ARCHIVING);
+ }
+
+ public SetArchivingPlan(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+ super(OperatorType.SET_ARCHIVING);
+ this.storageGroup = storageGroup;
+ this.targetDir = targetDir;
+ this.ttl = ttl;
+ this.startTime = startTime;
+ }
+
+ public SetArchivingPlan(
+ long taskId, PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+ // set archiving w/ taskId
+ super(OperatorType.SET_ARCHIVING);
+ this.taskId = taskId;
+ this.storageGroup = storageGroup;
+ this.targetDir = targetDir;
+ this.ttl = ttl;
+ this.startTime = startTime;
+ }
+
+ public SetArchivingPlan(PartialPath storageGroup) {
+ // cancel archiving using storage group
+ this(storageGroup, null, Long.MAX_VALUE, Long.MAX_VALUE);
+ }
+
+ public SetArchivingPlan(long taskId) {
+ // cancel archiving using taskId
+ this(taskId, null, null, Long.MAX_VALUE, Long.MAX_VALUE);
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ int type = PhysicalPlanType.ARCHIVING.ordinal();
+ stream.writeByte((byte) type);
+ stream.writeLong(ttl);
+ stream.writeLong(startTime);
+ putString(stream, storageGroup.getFullPath());
+ putString(stream, targetDir.getAbsolutePath());
+
+ stream.writeLong(taskId);
+ }
+
+ @Override
+ public void serializeImpl(ByteBuffer buffer) {
+ int type = PhysicalPlanType.TTL.ordinal();
+ buffer.put((byte) type);
+ buffer.putLong(ttl);
+ buffer.putLong(startTime);
+ putString(buffer, storageGroup.getFullPath());
+ putString(buffer, targetDir.getAbsolutePath());
+
+ buffer.putLong(taskId);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+ this.ttl = buffer.getLong();
+ this.startTime = buffer.getLong();
+ this.storageGroup = new PartialPath(readString(buffer));
+ this.targetDir = new File(readString(buffer));
+
+ this.taskId = buffer.getLong();
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public PartialPath getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setStorageGroup(PartialPath storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ public File getTargetDir() {
+ return targetDir;
+ }
+
+ public void setTargetDir(File targetDir) {
+ this.targetDir = targetDir;
+ }
+
+ public long getTTL() {
+ return ttl;
+ }
+
+ public void setTTL(long ttl) {
+ this.ttl = ttl;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
index 42eaf481d3..1b7cd70549 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowArchivingPlan.java
@@ -16,12 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.storagegroup;
+package org.apache.iotdb.db.qp.physical.sys;
-public enum TsFileResourceStatus {
- UNCLOSED,
- CLOSED,
- COMPACTION_CANDIDATE,
- COMPACTING,
- DELETED
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.util.List;
+
+public class ShowArchivingPlan extends ShowPlan {
+ private List<PartialPath> storageGroups;
+
+ public ShowArchivingPlan(List<PartialPath> storageGroups) {
+ super(ShowContentType.SHOW_ARCHIVING);
+ this.storageGroups = storageGroups;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return null;
+ }
+
+ public List<PartialPath> getStorageGroups() {
+ return storageGroups;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 021814e1b8..35e5f77132 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -127,5 +127,6 @@ public class ShowPlan extends PhysicalPlan {
NODES_IN_SCHEMA_TEMPLATE,
PATHS_SET_SCHEMA_TEMPLATE,
PATHS_USING_SCHEMA_TEMPLATE,
+ SHOW_ARCHIVING
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 70c737a6ee..47dd2c1a77 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -106,6 +106,8 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.StringContainer;
@@ -785,6 +787,126 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
return operator;
}
+ // Set Archiving
+ @Override
+ public Operator visitSetArchiving(IoTDBSqlParser.SetArchivingContext ctx) {
+ SetArchivingOperator operator = new SetArchivingOperator(SQLConstant.TOK_SET);
+
+ if (ctx.storageGroup != null) {
+ operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+ }
+ if (ctx.ttl != null) {
+ operator.setTTL(Long.parseLong(ctx.ttl.getText()));
+ }
+ if (ctx.startTime != null) {
+ operator.setStartTime(parseDateFormat(ctx.startTime.getText()));
+ }
+ if (ctx.targetDir != null) {
+ FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+ File targetDir = fsFactory.getFile(parseStringLiteral(ctx.targetDir.getText()));
+ if (!targetDir.exists()) {
+ throw new SQLParserException("unknown directory");
+ } else if (!targetDir.isDirectory()) {
+ throw new SQLParserException("not a directory");
+ }
+ operator.setTargetDir(targetDir);
+ }
+
+ // parse the setArchivingClause
+ for (IoTDBSqlParser.SetArchivingClauseContext setArchivingClauseContext :
+ ctx.setArchivingClause()) {
+ parseSetArchivingClause(operator, setArchivingClauseContext);
+ }
+
+ return operator;
+ }
+
+ private void parseSetArchivingClause(
+ SetArchivingOperator operator, IoTDBSqlParser.SetArchivingClauseContext ctx) {
+ if (ctx.storageGroup != null) {
+ operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+ }
+ if (ctx.ttl != null) {
+ operator.setTTL(Long.parseLong(ctx.ttl.getText()));
+ }
+ if (ctx.startTime != null) {
+ operator.setStartTime(parseDateFormat(ctx.startTime.getText()));
+ }
+ if (ctx.targetDir != null) {
+ FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+ File targetDir = fsFactory.getFile(parseStringLiteral(ctx.targetDir.getText()));
+ if (!targetDir.exists()) {
+ throw new SQLParserException("unknown directory");
+ } else if (!targetDir.isDirectory()) {
+ throw new SQLParserException("not a directory");
+ }
+ operator.setTargetDir(targetDir);
+ }
+ }
+
+ // Cancel Archiving
+ @Override
+ public Operator visitCancelArchiving(IoTDBSqlParser.CancelArchivingContext ctx) {
+ CancelArchivingOperator operator = new CancelArchivingOperator(SQLConstant.TOK_UNSET);
+ if (ctx.storageGroup != null) {
+ operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+ } else if (ctx.taskId != null) {
+ operator.setTaskId(Long.parseLong(ctx.taskId.getText()));
+ } else {
+ // unknown case
+ throw new SQLParserException("cancel archiving unknown case");
+ }
+ return operator;
+ }
+
+ // Pause Archiving
+ @Override
+ public Operator visitPauseArchiving(IoTDBSqlParser.PauseArchivingContext ctx) {
+ PauseArchivingOperator operator = new PauseArchivingOperator(SQLConstant.TOK_SET);
+ if (ctx.storageGroup != null) {
+ operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+ } else if (ctx.taskId != null) {
+ operator.setTaskId(Long.parseLong(ctx.taskId.getText()));
+ } else {
+ // unknown case
+ throw new SQLParserException("pause archiving unknown case");
+ }
+ return operator;
+ }
+
+ // Resume Archiving
+ @Override
+ public Operator visitResumeArchiving(IoTDBSqlParser.ResumeArchivingContext ctx) {
+ ResumeArchivingOperator operator = new ResumeArchivingOperator(SQLConstant.TOK_UNSET);
+ if (ctx.storageGroup != null) {
+ operator.setStorageGroup(parsePrefixPath(ctx.storageGroup));
+ } else if (ctx.taskId != null) {
+ operator.setTaskId(Long.parseLong(ctx.taskId.getText()));
+ } else {
+ // unknown case
+ throw new SQLParserException("resume archiving unknown case");
+ }
+ return operator;
+ }
+
+ // Show Archiving
+ @Override
+ public Operator visitShowArchiving(IoTDBSqlParser.ShowArchivingContext ctx) {
+ List<PartialPath> storageGroups = new ArrayList<>();
+ List<IoTDBSqlParser.PrefixPathContext> prefixPathList = ctx.prefixPath();
+ for (IoTDBSqlParser.PrefixPathContext prefixPath : prefixPathList) {
+ storageGroups.add(parsePrefixPath(prefixPath));
+ }
+ return new ShowArchivingOperator(storageGroups);
+ }
+
+ // Show All Archiving
+ @Override
+ public Operator visitShowAllArchiving(IoTDBSqlParser.ShowAllArchivingContext ctx) {
+ List<PartialPath> storageGroups = new ArrayList<>();
+ return new ShowArchivingOperator(storageGroups);
+ }
+
// Start Trigger
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 1333674805..db237b904e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceCheck;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingManager;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
@@ -140,6 +141,7 @@ public class IoTDB implements IoTDBMBean {
logger.info("recover the schema...");
initMManager();
initServiceProvider();
+ initArchivingManager();
registerManager.register(JMXService.getInstance());
registerManager.register(FlushManager.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
@@ -226,6 +228,11 @@ public class IoTDB implements IoTDBMBean {
IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold());
}
+ private void initArchivingManager() {
+ // recover ArchivingTasks, finish archiving unfinished tsfiles, start check threads
+ ArchivingManager.getInstance().init();
+ }
+
@Override
public void stop() {
deactivate();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriterReaderTest.java
new file mode 100644
index 0000000000..67b316531c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingOperateWriterReaderTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ArchivingOperateWriterReaderTest {
+ private static final String filePath = "logtest.test";
+ private final String sg1 = "root.ARCHIVING_SG1";
+ private final String sg2 = "root.ARCHIVING_SG1";
+ private long startTime; // 2023-01-01
+ private final long ttl = 2000;
+ private final String targetDirPath = Paths.get("data", "separated").toString();
+ List<ArchivingOperate> archivingOperate;
+ ArchivingTask task1, task2;
+
+ @Before
+ public void prepare() throws IllegalPathException, LogicalOperatorException {
+ if (new File(filePath).exists()) {
+ new File(filePath).delete();
+ }
+ task1 = new ArchivingTask(120, new PartialPath(sg1), new File(targetDirPath), startTime, ttl);
+ task2 = new ArchivingTask(999, new PartialPath(sg2), new File(targetDirPath), startTime, ttl);
+
+ archivingOperate = new ArrayList<>();
+ archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.START, task1));
+ archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.SET, task1));
+ archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.CANCEL, task2));
+ archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.PAUSE, task2));
+ archivingOperate.add(new ArchivingOperate(ArchivingOperate.ArchivingOperateType.RESUME, task2));
+
+ startTime = DatetimeUtils.convertDatetimeStrToLong("2023-01-01", ZoneId.systemDefault());
+ task1.close();
+ task2.close();
+ }
+
+ public void writeLog(ArchivingOperateWriter writer) throws IOException {
+ writer.log(ArchivingOperate.ArchivingOperateType.START, task1);
+ writer.log(ArchivingOperate.ArchivingOperateType.SET, task1);
+ writer.log(ArchivingOperate.ArchivingOperateType.CANCEL, task2);
+ writer.log(ArchivingOperate.ArchivingOperateType.PAUSE, task2);
+ writer.log(ArchivingOperate.ArchivingOperateType.RESUME, task2);
+ }
+
+ /** check if two logs have equal fields */
+ public boolean logEquals(ArchivingOperate log1, ArchivingOperate log2) {
+ if (log1.getType() != log2.getType()) {
+ return false;
+ }
+ if (log1.getTask().getTaskId() != log2.getTask().getTaskId()) {
+ return false;
+ }
+
+ if (log1.getType() == ArchivingOperate.ArchivingOperateType.SET) {
+ // check other fields only if SET
+ if (log1.getTask().getStartTime() != log2.getTask().getStartTime()) {
+ return false;
+ }
+ if (log1.getTask().getTTL() != log2.getTask().getTTL()) {
+ return false;
+ }
+ if (!log1.getTask()
+ .getStorageGroup()
+ .getFullPath()
+ .equals(log2.getTask().getStorageGroup().getFullPath())) {
+ return false;
+ }
+ if (!log1.getTask()
+ .getTargetDir()
+ .getPath()
+ .equals(log2.getTask().getTargetDir().getPath())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Test
+ public void testWriteAndRead() throws Exception {
+ ArchivingOperateWriter writer = new ArchivingOperateWriter(filePath);
+ writeLog(writer);
+ try (ArchivingOperateReader reader = new ArchivingOperateReader(new File(filePath))) {
+ writer.close();
+ List<ArchivingOperate> res = new ArrayList<>();
+ while (reader.hasNext()) {
+ res.add(reader.next());
+ }
+ for (int i = 0; i < archivingOperate.size(); i++) {
+ assertTrue(logEquals(archivingOperate.get(i), res.get(i)));
+ }
+ } finally {
+ new File(filePath).delete();
+ }
+ }
+
+ @Test
+ public void testTruncateBrokenLogs() throws Exception {
+ try {
+ // write normal data
+ try (ArchivingOperateWriter writer = new ArchivingOperateWriter(filePath)) {
+ writeLog(writer);
+ }
+ long expectedLength = new File(filePath).length();
+
+ // just write partial content
+ try (FileOutputStream outputStream = new FileOutputStream(filePath, true);
+ FileChannel channel = outputStream.getChannel()) {
+ ByteBuffer logBuffer = ByteBuffer.allocate(4 * 30);
+ for (int i = 0; i < 20; ++i) {
+ logBuffer.putInt(Integer.MIN_VALUE);
+ }
+ logBuffer.flip();
+ ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+ lengthBuffer.putInt(logBuffer.capacity());
+ lengthBuffer.flip();
+
+ channel.write(lengthBuffer);
+ channel.write(logBuffer);
+ channel.force(true);
+ }
+
+ // read & check
+ try (ArchivingOperateReader reader = new ArchivingOperateReader(new File(filePath))) {
+ List<ArchivingOperate> res = new ArrayList<>();
+ while (reader.hasNext()) {
+ res.add(reader.next());
+ }
+ for (int i = 0; i < archivingOperate.size(); i++) {
+ assertTrue(logEquals(archivingOperate.get(i), res.get(i)));
+ }
+ }
+ assertEquals(expectedLength, new File(filePath).length());
+ } finally {
+ new File(filePath).delete();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingRecoverTest.java
new file mode 100644
index 0000000000..e1092a35dd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingRecoverTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ArchivingRecoverTest {
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final File ARCHIVING_LOG_DIR =
+ SystemFileFactory.INSTANCE.getFile(
+ Paths.get(
+ FilePathUtils.regularizePath(config.getSystemDir()),
+ IoTDBConstant.ARCHIVING_FOLDER_NAME,
+ IoTDBConstant.ARCHIVING_LOG_FOLDER_NAME)
+ .toString());
+ private long testTaskId = 99;
+ private File testLogFile;
+ private List<File> testFiles;
+ private File testTargetDir;
+
+ @Before
+ public void setUp()
+ throws MetadataException, StorageGroupProcessorException, LogicalOperatorException {
+ EnvironmentUtils.envSetUp();
+
+ testLogFile = SystemFileFactory.INSTANCE.getFile(ARCHIVING_LOG_DIR, testTaskId + ".log");
+
+ testTargetDir = new File("testTargetDir");
+ testTargetDir.mkdirs();
+
+ testFiles = new ArrayList<>();
+ testFiles.add(new File("test.tsfile"));
+ testFiles.add(new File("test.tsfile.resource"));
+ testFiles.add(new File("test.tsfile.mods"));
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ EnvironmentUtils.cleanEnv();
+
+ FileUtils.deleteDirectory(testTargetDir);
+ deleteTestFiles();
+ }
+
+ private void setupTestFiles() throws IOException {
+ for (File file : testFiles) {
+ if (!file.exists()) {
+ file.createNewFile();
+ }
+ }
+ }
+
+ private void deleteTestFiles() {
+ for (File file : testFiles) {
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+ }
+
+ private void cleanupTargetDir() {
+ for (File file : testTargetDir.listFiles()) {
+ file.delete();
+ }
+ }
+
+ private File getTsFile() {
+ return testFiles.get(0);
+ }
+
+ @Test
+ public void testWriteAndRead() throws Exception {
+ // create test files
+ setupTestFiles();
+
+ // test write
+ ArchivingTask task = new ArchivingTask(testTaskId, null, testTargetDir, 0, 0);
+ task.startTask();
+ task.startFile(getTsFile());
+ task.close();
+
+ FileInputStream fileInputStream = new FileInputStream(testLogFile);
+
+ assertTrue(testLogFile.exists());
+ assertEquals(testTargetDir.getAbsolutePath(), ReadWriteIOUtils.readString(fileInputStream));
+ assertEquals(getTsFile().getAbsolutePath(), ReadWriteIOUtils.readString(fileInputStream));
+ assertEquals(0, fileInputStream.available());
+
+ fileInputStream.close();
+
+ // test read
+ ArchivingRecover recover = new ArchivingRecover();
+ recover.recover();
+
+ for (File file : testFiles) {
+ assertFalse(file.exists());
+ }
+
+ assertFalse(testLogFile.exists());
+
+ assertEquals(3, testTargetDir.listFiles().length);
+ cleanupTargetDir();
+ }
+
+ @Test
+ public void testMissingFiles() throws IOException {
+ // test missing .tsfile .resource .mods
+ File missingTsFile = new File("testMissing.tsfile");
+ File missingTsFileRes = new File("testMissing.tsfile.resource");
+ File missingTsFileMods = new File("testMissing.tsfile.mods");
+
+ assertFalse(missingTsFile.exists());
+ assertFalse(missingTsFileRes.exists());
+ assertFalse(missingTsFileMods.exists());
+
+ testLogFile.createNewFile();
+
+ FileOutputStream logOutput = new FileOutputStream(testLogFile);
+
+ ReadWriteIOUtils.write(testTargetDir.getAbsolutePath(), logOutput);
+ ReadWriteIOUtils.write(missingTsFile.getAbsolutePath(), logOutput);
+
+ logOutput.close();
+
+ // test read
+ ArchivingRecover recover = new ArchivingRecover();
+ recover.recover();
+
+ assertEquals(0, testTargetDir.listFiles().length);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
new file mode 100644
index 0000000000..7a6a5eb626
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/archiving/ArchivingTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.engine.archiving;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.sys.PauseArchivingPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetArchivingPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowArchivingPlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ArchivingTest {
+ private final String sg1 = "root.ARCHIVING_SG1";
+ private final String sg2 = "root.ARCHIVING_SG2";
+ private final long ttl = 12345;
+ private long startTime; // 2023-01-01
+ private VirtualStorageGroupProcessor virtualStorageGroupProcessor;
+ private final String s1 = "s1";
+ private final String g1s1 = sg1 + IoTDBConstant.PATH_SEPARATOR + s1;
+ private long prevPartitionInterval;
+ private File targetDir;
+
+ @Before
+ public void setUp()
+ throws MetadataException, StorageGroupProcessorException, LogicalOperatorException {
+ prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(86400);
+ EnvironmentUtils.envSetUp();
+ createSchemas();
+ targetDir = new File("separated_test");
+ targetDir.mkdirs();
+
+ startTime = DatetimeUtils.convertDatetimeStrToLong("2023-01-01", ZoneId.systemDefault());
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
+ File[] movedFiles = targetDir.listFiles();
+ if (movedFiles != null) {
+ for (File file : movedFiles) {
+ file.delete();
+ }
+ }
+ targetDir.delete();
+ }
+
+ private void createSchemas() throws MetadataException, StorageGroupProcessorException {
+ virtualStorageGroupProcessor =
+ new VirtualStorageGroupProcessor(
+ IoTDBDescriptor.getInstance().getConfig().getSystemDir(),
+ sg1,
+ new DirectFlushPolicy(),
+ sg1);
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath(g1s1),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ Collections.emptyMap());
+ }
+
+ private void prepareData()
+ throws WriteProcessException, QueryProcessException, IllegalPathException,
+ TriggerExecutionException {
+ InsertRowPlan plan = new InsertRowPlan();
+ plan.setDevicePath(new PartialPath(sg1));
+ plan.setTime(System.currentTimeMillis());
+ plan.setMeasurements(new String[] {"s1"});
+ plan.setDataTypes(new TSDataType[] {TSDataType.INT64});
+ plan.setValues(new Object[] {1L});
+ plan.setMeasurementMNodes(
+ new IMeasurementMNode[] {
+ MeasurementMNode.getMeasurementMNode(
+ null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
+ });
+ plan.transferType();
+
+ long initTime = System.currentTimeMillis();
+ // sequence data
+ for (int i = 1000; i < 2000; i++) {
+ plan.setTime(initTime - 2000 + i);
+ virtualStorageGroupProcessor.insert(plan);
+ if ((i + 1) % 300 == 0) {
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ }
+ }
+ // unsequence data
+ for (int i = 0; i < 1000; i++) {
+ plan.setTime(initTime - 2000 + i);
+ virtualStorageGroupProcessor.insert(plan);
+ if ((i + 1) % 300 == 0) {
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ }
+ }
+ }
+
+ @Test
+ public void testArchiving()
+ throws StorageEngineException, WriteProcessException, QueryProcessException,
+ IllegalPathException, IOException {
+ prepareData();
+
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+
+ // files before ttl
+ File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
+ File unseqDir = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), sg1);
+
+ List<File> seqFiles = new ArrayList<>();
+ for (File directory : seqDir.listFiles()) {
+ if (directory.isDirectory()) {
+ for (File file : directory.listFiles()) {
+ if (file.isDirectory()) {
+ for (File tsfile : file.listFiles()) {
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ seqFiles.add(file);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ List<File> unseqFiles = new ArrayList<>();
+ for (File directory : unseqDir.listFiles()) {
+ if (directory.isDirectory()) {
+ for (File file : directory.listFiles()) {
+ if (file.isDirectory()) {
+ for (File tsfile : file.listFiles()) {
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ unseqFiles.add(file);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ assertEquals(4, seqFiles.size());
+ assertEquals(4, unseqFiles.size());
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ // create a new ArchivingTask with specified params
+ ArchivingTask task = new ArchivingTask(0, new PartialPath(sg1), targetDir, 500, 0);
+ task.setStatus(ArchivingTask.ArchivingTaskStatus.RUNNING);
+ task.startTask();
+ virtualStorageGroupProcessor.checkArchivingTask(task);
+ task.close();
+
+ // files after archiving
+ seqFiles = new ArrayList<>();
+ for (File directory : seqDir.listFiles()) {
+ if (directory.isDirectory()) {
+ for (File file : directory.listFiles()) {
+ if (file.isDirectory()) {
+ for (File tsfile : file.listFiles()) {
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ seqFiles.add(file);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ unseqFiles = new ArrayList<>();
+ for (File directory : unseqDir.listFiles()) {
+ if (directory.isDirectory()) {
+ for (File file : directory.listFiles()) {
+ if (file.isDirectory()) {
+ for (File tsfile : file.listFiles()) {
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ unseqFiles.add(file);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ assertTrue(seqFiles.size() <= 2);
+ assertEquals(0, unseqFiles.size());
+
+ List<File> targetFiles = new ArrayList<>();
+ for (File tsfile : targetDir.listFiles()) {
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ targetFiles.add(tsfile);
+ }
+ }
+
+ assertEquals(8, targetFiles.size() + seqFiles.size() + unseqFiles.size());
+ }
+
+ @Test
+ public void testParseSetArchiving() throws QueryProcessException {
+ Planner planner = new Planner();
+ SetArchivingPlan plan =
+ (SetArchivingPlan)
+ planner.parseSQLToPhysicalPlan(
+ String.format(
+ "SET ARCHIVING TO %s 2023-01-01 10000 '%s'", sg1, targetDir.getPath()));
+ assertEquals(sg1, plan.getStorageGroup().getFullPath());
+ assertEquals(10000, plan.getTTL());
+ assertEquals(startTime, plan.getStartTime());
+ assertEquals(targetDir.getPath(), plan.getTargetDir().getPath());
+
+ plan =
+ (SetArchivingPlan)
+ planner.parseSQLToPhysicalPlan(
+ String.format(
+ "SET ARCHIVING TO start_time=2023-01-01 storage_group=%s ttl=10000 target_dir='%s'",
+ sg1, targetDir.getPath()));
+ assertEquals(sg1, plan.getStorageGroup().getFullPath());
+ assertEquals(10000, plan.getTTL());
+ assertEquals(startTime, plan.getStartTime());
+ assertEquals(targetDir.getPath(), plan.getTargetDir().getPath());
+
+ plan = (SetArchivingPlan) planner.parseSQLToPhysicalPlan("CANCEL ARCHIVING ON " + sg2);
+ assertEquals(sg2, plan.getStorageGroup().getFullPath());
+ assertEquals(Long.MAX_VALUE, plan.getTTL());
+ assertEquals(Long.MAX_VALUE, plan.getStartTime());
+
+ plan = (SetArchivingPlan) planner.parseSQLToPhysicalPlan("CANCEL ARCHIVING 99");
+ assertEquals(99, plan.getTaskId());
+ assertEquals(Long.MAX_VALUE, plan.getTTL());
+ assertEquals(Long.MAX_VALUE, plan.getStartTime());
+ }
+
+ @Test
+ public void testParsePauseArchiving() throws QueryProcessException {
+ Planner planner = new Planner();
+ PauseArchivingPlan plan =
+ (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("PAUSE ARCHIVING ON " + sg2);
+ assertEquals(sg2, plan.getStorageGroup().getFullPath());
+ assertTrue(plan.isPause());
+
+ plan = (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("PAUSE ARCHIVING 10");
+ assertEquals(10, plan.getTaskId());
+ assertTrue(plan.isPause());
+
+ plan = (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("RESUME ARCHIVING ON " + sg1);
+ assertEquals(sg1, plan.getStorageGroup().getFullPath());
+ assertFalse(plan.isPause());
+
+ plan = (PauseArchivingPlan) planner.parseSQLToPhysicalPlan("RESUME ARCHIVING 16");
+ assertEquals(16, plan.getTaskId());
+ assertFalse(plan.isPause());
+ }
+
+ @Test
+ public void testParseShowArchiving() throws QueryProcessException {
+ Planner planner = new Planner();
+ ShowArchivingPlan plan =
+ (ShowArchivingPlan) planner.parseSQLToPhysicalPlan("SHOW ALL ARCHIVING");
+ assertTrue(plan.getStorageGroups().isEmpty());
+
+ plan = (ShowArchivingPlan) planner.parseSQLToPhysicalPlan("SHOW ARCHIVING ON " + sg1);
+ assertEquals(sg1, plan.getStorageGroups().get(0).getFullPath());
+ }
+
+ @Test
+ public void testShowArchiving()
+ throws IOException, QueryProcessException, QueryFilterOptimizationException,
+ StorageEngineException, MetadataException, InterruptedException {
+ ArchivingManager archivingManager = ArchivingManager.getInstance();
+ archivingManager.setArchiving(new PartialPath(sg1), targetDir, ttl, startTime);
+
+ ShowArchivingPlan plan = new ShowArchivingPlan(Collections.emptyList());
+ PlanExecutor executor = new PlanExecutor();
+ QueryDataSet queryDataSet = executor.processQuery(plan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+
+ while (queryDataSet.hasNext()) {
+ RowRecord rowRecord = queryDataSet.next();
+ String sg = rowRecord.getFields().get(2).getStringValue();
+ if (sg.equals(sg1)) {
+ ZonedDateTime startDate = DatetimeUtils.convertMillsecondToZonedDateTime(startTime);
+ assertEquals(
+ DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(startDate),
+ rowRecord.getFields().get(4).getStringValue());
+ assertEquals(ttl, rowRecord.getFields().get(5).getLongV());
+ assertEquals(targetDir.getPath(), rowRecord.getFields().get(6).getStringValue());
+ } else {
+ fail();
+ }
+ }
+ }
+
+ @Test
+ public void testArchivingCleanFile()
+ throws WriteProcessException, QueryProcessException, IllegalPathException,
+ TriggerExecutionException {
+ prepareData();
+ virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+
+ assertEquals(4, virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(4, virtualStorageGroupProcessor.getUnSequenceFileList().size());
+
+ ArchivingTask task = new ArchivingTask(0, new PartialPath(sg1), targetDir, 0, 0);
+ task.setStatus(ArchivingTask.ArchivingTaskStatus.RUNNING);
+ virtualStorageGroupProcessor.checkArchivingTask(task);
+
+ assertEquals(0, virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(0, virtualStorageGroupProcessor.getUnSequenceFileList().size());
+ task.close();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index a7bd78165b..b439387eef 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.SystemStatus;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.archiving.ArchivingManager;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
@@ -143,6 +144,9 @@ public class EnvironmentUtils {
fail();
}
+ // clear archiving manager
+ ArchivingManager.getInstance().close();
+
IoTDBDescriptor.getInstance().getConfig().setSystemStatus(SystemStatus.NORMAL);
// We must disable MQTT service as it will cost a lot of time to be shutdown, which may slow our
// unit tests.