You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/14 11:39:45 UTC
[iotdb] branch IOTDB-4619 updated: Add IT for CQSnapshot
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
new b42c96c6f8 Add IT for CQSnapshot
b42c96c6f8 is described below
commit b42c96c6f8500ddf04630a6de2e713d691183e7c
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 14 19:39:38 2022 +0800
Add IT for CQSnapshot
---
.../iotdb/confignode/persistence/CQInfoTest.java | 9 ++-
.../confignode/IoTDBConfigNodeSnapshotIT.java | 82 +++++++++++++++++-----
2 files changed, 69 insertions(+), 22 deletions(-)
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index c9d3bc6420..9d9b6105c8 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.confignode.persistence;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
@@ -56,7 +55,7 @@ public class CQInfoTest {
}
@Test
- public void testSnapshot() throws TException, IOException, IllegalPathException {
+ public void testSnapshot() throws TException, IOException {
long executionTime = System.currentTimeMillis();
AddCQPlan addCQPlan =
new AddCQPlan(
@@ -84,9 +83,9 @@ public class CQInfoTest {
0,
1000,
0,
- (byte) 0,
- "select s1 into root.backup.d2.s1 from root.sg.d1",
- "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d1 END",
+ (byte) 1,
+ "select s1 into root.backup.d2.s1 from root.sg.d2",
+ "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END",
"Asia"),
"testCq2_md5",
executionTime);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
index 07e999db7d..731393b486 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
@@ -22,12 +22,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cq.CQState;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -35,6 +38,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
@@ -61,8 +65,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
@@ -130,12 +138,14 @@ public class IoTDBConfigNodeSnapshotIT {
List<TCreateTriggerReq> createTriggerReqs = createTrigger(client);
+ Set<TCQEntry> expectedCQEntries = createCQs(client);
+
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
TSetStorageGroupReq setStorageGroupReq =
new TSetStorageGroupReq(new TStorageGroupSchema(storageGroup));
TSStatus status = client.setStorageGroup(setStorageGroupReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
for (int j = 0; j < seriesPartitionSlotsNum; j++) {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
@@ -146,14 +156,14 @@ public class IoTDBConfigNodeSnapshotIT {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
// All requests should success if snapshot success
- Assert.assertEquals(
+ assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
schemaPartitionTableResp.getStatus().getCode());
Assert.assertNotNull(schemaPartitionTableResp.getSchemaPartitionTable());
- Assert.assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize());
+ assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize());
Assert.assertNotNull(
schemaPartitionTableResp.getSchemaPartitionTable().get(storageGroup));
- Assert.assertEquals(
+ assertEquals(
1, schemaPartitionTableResp.getSchemaPartitionTable().get(storageGroup).size());
for (int k = 0; k < timePartitionSlotsNum; k++) {
@@ -171,20 +181,20 @@ public class IoTDBConfigNodeSnapshotIT {
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(dataPartitionReq);
// All requests should success if snapshot success
- Assert.assertEquals(
+ assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
dataPartitionTableResp.getStatus().getCode());
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- Assert.assertEquals(1, dataPartitionTableResp.getDataPartitionTableSize());
+ assertEquals(1, dataPartitionTableResp.getDataPartitionTableSize());
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable().get(storageGroup));
- Assert.assertEquals(
+ assertEquals(
1, dataPartitionTableResp.getDataPartitionTable().get(storageGroup).size());
Assert.assertNotNull(
dataPartitionTableResp
.getDataPartitionTable()
.get(storageGroup)
.get(seriesPartitionSlot));
- Assert.assertEquals(
+ assertEquals(
1,
dataPartitionTableResp
.getDataPartitionTable()
@@ -196,6 +206,10 @@ public class IoTDBConfigNodeSnapshotIT {
}
assertTriggerInformation(createTriggerReqs, client.getTriggerTable());
+
+ TShowCQResp showCQResp = client.showCQ();
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), showCQResp.getStatus().getCode());
+ assertEquals(expectedCQEntries, new HashSet<>(showCQResp.cqList));
}
}
@@ -242,10 +256,10 @@ public class IoTDBConfigNodeSnapshotIT {
.setJarMD5(jarMD5)
.setJarFile(jarFile);
- Assert.assertEquals(
+ assertEquals(
client.createTrigger(createTriggerReq1).getCode(),
TSStatusCode.SUCCESS_STATUS.getStatusCode());
- Assert.assertEquals(
+ assertEquals(
client.createTrigger(createTriggerReq2).getCode(),
TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -261,19 +275,53 @@ public class IoTDBConfigNodeSnapshotIT {
TriggerInformation triggerInformation =
TriggerInformation.deserialize(resp.getAllTriggerInformation().get(i));
- Assert.assertEquals(createTriggerReq.getTriggerName(), triggerInformation.getTriggerName());
- Assert.assertEquals(createTriggerReq.getClassName(), triggerInformation.getClassName());
- Assert.assertEquals(createTriggerReq.getJarPath(), triggerInformation.getJarName());
- Assert.assertEquals(
- createTriggerReq.getTriggerEvent(), triggerInformation.getEvent().getId());
- Assert.assertEquals(
+ assertEquals(createTriggerReq.getTriggerName(), triggerInformation.getTriggerName());
+ assertEquals(createTriggerReq.getClassName(), triggerInformation.getClassName());
+ assertEquals(createTriggerReq.getJarPath(), triggerInformation.getJarName());
+ assertEquals(createTriggerReq.getTriggerEvent(), triggerInformation.getEvent().getId());
+ assertEquals(
createTriggerReq.getTriggerType(),
triggerInformation.isStateful()
? TriggerType.STATEFUL.getId()
: TriggerType.STATELESS.getId());
- Assert.assertEquals(
+ assertEquals(
PathDeserializeUtil.deserialize(ByteBuffer.wrap(createTriggerReq.getPathPattern())),
triggerInformation.getPathPattern());
}
}
+
+ private Set<TCQEntry> createCQs(SyncConfigNodeIServiceClient client) throws TException {
+ String sql1 = "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END";
+ String sql2 = "create cq testCq1 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END";
+ TCreateCQReq req1 =
+ new TCreateCQReq(
+ "testCq1",
+ 1000,
+ 0,
+ 1000,
+ 0,
+ (byte) 0,
+ "select s1 into root.backup.d1.s1 from root.sg.d1",
+ sql1,
+ "Asia");
+ TCreateCQReq req2 =
+ new TCreateCQReq(
+ "testCq2",
+ 1000,
+ 0,
+ 1000,
+ 0,
+ (byte) 1,
+ "select s1 into root.backup.d2.s1 from root.sg.d2",
+ sql2,
+ "Asia");
+
+ assertEquals(client.createCQ(req1).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ assertEquals(client.createCQ(req2).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+ Set<TCQEntry> result = new HashSet<>();
+ result.add(new TCQEntry("testCq1", sql1, CQState.ACTIVE.getType()));
+ result.add(new TCQEntry("testCq2", sql2, CQState.ACTIVE.getType()));
+ return result;
+ }
}