You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/15 02:02:51 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-4640] Allow setting the same data archiving task after canceling it (#7609)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 93c7c2e859 [To rel/0.13][IOTDB-4640] Allow setting the same data archiving task after canceling it (#7609)
93c7c2e859 is described below
commit 93c7c2e859506e52c56ee9b98a5c173d2a5eb38f
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Sat Oct 15 10:02:46 2022 +0800
[To rel/0.13][IOTDB-4640] Allow setting the same data archiving task after canceling it (#7609)
---
.../iotdb/db/integration/IoTDBArchivingIT.java | 44 +++++++++++++++++++++-
.../org/apache/iotdb/db/engine/StorageEngine.java | 9 +----
.../db/engine/archiving/ArchivingManager.java | 7 +++-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 22 ++++++++---
4 files changed, 67 insertions(+), 15 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
index 4963267f7d..fad6102139 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
@@ -293,7 +293,7 @@ public class IoTDBArchivingIT {
@Test
@Category({ClusterTest.class})
- public void testSetArchive() throws SQLException {
+ public void testSetArchiving() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
@@ -342,4 +342,46 @@ public class IoTDBArchivingIT {
}
}
}
+
+ @Test
+ @Category({ClusterTest.class})
+ public void testReSubmitArchiving() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
+
+ statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG4");
+
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG4 3000-12-13 100 '" + testTargetDir.getPath() + "'");
+ ResultSet resultSet = statement.executeQuery("SHOW ARCHIVING");
+ boolean flag = false;
+ while (resultSet.next()) {
+ if (resultSet.getString(3).equals("root.ARCHIVING_SG4")) {
+ flag = true;
+ assertEquals("READY", resultSet.getString(4));
+ assertTrue(resultSet.getString(5).startsWith("3000-12-13"));
+ assertEquals(100, resultSet.getLong(6));
+ assertEquals(testTargetDir.getPath(), resultSet.getString(7));
+ }
+ }
+ assertTrue(flag);
+
+ statement.execute("CANCEL ARCHIVING 0");
+ statement.execute(
+ "SET ARCHIVING TO root.ARCHIVING_SG4 3000-12-13 100 '" + testTargetDir.getPath() + "'");
+ resultSet = statement.executeQuery("SHOW ARCHIVING");
+ flag = false;
+ while (resultSet.next()) {
+ if (resultSet.getString(3).equals("root.ARCHIVING_SG4")) {
+ flag = true;
+ assertEquals("READY", resultSet.getString(4));
+ assertTrue(resultSet.getString(5).startsWith("3000-12-13"));
+ assertEquals(100, resultSet.getLong(6));
+ assertEquals(testTargetDir.getPath(), resultSet.getString(7));
+ }
+ }
+ assertTrue(flag);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index e908e9a5e3..595a2b6935 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -1316,13 +1316,8 @@ public class StorageEngine implements IService {
}
/** push the archiving info to archivingManager */
- public void setArchiving(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
- boolean result = archivingManager.setArchiving(storageGroup, targetDir, ttl, startTime);
- if (result) {
- logger.info("set archiving task successfully.");
- } else {
- logger.info("set archiving task failed.");
- }
+ public boolean setArchiving(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+ return archivingManager.setArchiving(storageGroup, targetDir, ttl, startTime);
}
public boolean operateArchiving(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
index 34e43bcd55..c83f875f38 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java
@@ -212,11 +212,14 @@ public class ArchivingManager {
// check if there are duplicates
for (ArchivingTask archivingTask : archivingTasks) {
- if (archivingTask.getStorageGroup().getFullPath().equals(storageGroup.getFullPath())
+ if (archivingTask.isActive()
+ && archivingTask.getStorageGroup().getFullPath().equals(storageGroup.getFullPath())
&& archivingTask.getTargetDir().equals(targetDir)
&& archivingTask.getTTL() == ttl
&& archivingTask.getStartTime() == startTime) {
- logger.warn("archiving task already equals archiving task {}", archivingTask.getTaskId());
+ logger.warn(
+ "Fail to set archiving task, it's same as the archiving task {}",
+ archivingTask.getTaskId());
return false;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 5af46ebecd..a6989293c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1713,16 +1713,28 @@ public class PlanExecutor implements IPlanExecutor {
throw new QueryProcessException("Fail to cancel archiving task.");
}
} else {
+ List<PartialPath> storageGroupPaths;
try {
- List<PartialPath> storageGroupPaths =
+ storageGroupPaths =
IoTDB.metaManager.getMatchedStorageGroups(plan.getStorageGroup(), plan.isPrefixMatch());
- for (PartialPath storagePath : storageGroupPaths) {
- StorageEngine.getInstance()
- .setArchiving(storagePath, plan.getTargetDir(), plan.getTTL(), plan.getStartTime());
- }
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
+
+ List<String> failedStorageGroups = new ArrayList<>();
+ for (PartialPath storagePath : storageGroupPaths) {
+ boolean success =
+ StorageEngine.getInstance()
+ .setArchiving(storagePath, plan.getTargetDir(), plan.getTTL(), plan.getStartTime());
+ if (!success) {
+ failedStorageGroups.add(storagePath.getFullPath());
+ }
+ }
+ if (!failedStorageGroups.isEmpty()) {
+ throw new QueryProcessException(
+ String.format(
+ "Fail to set archiving tasks for %s", String.join(", ", failedStorageGroups)));
+ }
}
}