You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/14 11:39:45 UTC

[iotdb] branch IOTDB-4619 updated: Add IT for CQSnapshot

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
     new b42c96c6f8 Add IT for CQSnapshot
b42c96c6f8 is described below

commit b42c96c6f8500ddf04630a6de2e713d691183e7c
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 14 19:39:38 2022 +0800

    Add IT for CQSnapshot
---
 .../iotdb/confignode/persistence/CQInfoTest.java   |  9 ++-
 .../confignode/IoTDBConfigNodeSnapshotIT.java      | 82 +++++++++++++++++-----
 2 files changed, 69 insertions(+), 22 deletions(-)

diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index c9d3bc6420..9d9b6105c8 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.confignode.persistence;
 
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
@@ -56,7 +55,7 @@ public class CQInfoTest {
   }
 
   @Test
-  public void testSnapshot() throws TException, IOException, IllegalPathException {
+  public void testSnapshot() throws TException, IOException {
     long executionTime = System.currentTimeMillis();
     AddCQPlan addCQPlan =
         new AddCQPlan(
@@ -84,9 +83,9 @@ public class CQInfoTest {
                 0,
                 1000,
                 0,
-                (byte) 0,
-                "select s1 into root.backup.d2.s1 from root.sg.d1",
-                "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d1 END",
+                (byte) 1,
+                "select s1 into root.backup.d2.s1 from root.sg.d2",
+                "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END",
                 "Asia"),
             "testCq2_md5",
             executionTime);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
index 07e999db7d..731393b486 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
@@ -22,12 +22,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cq.CQState;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -35,6 +38,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.it.env.ConfigFactory;
 import org.apache.iotdb.it.env.EnvFactory;
@@ -61,8 +65,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
@@ -130,12 +138,14 @@ public class IoTDBConfigNodeSnapshotIT {
 
       List<TCreateTriggerReq> createTriggerReqs = createTrigger(client);
 
+      Set<TCQEntry> expectedCQEntries = createCQs(client);
+
       for (int i = 0; i < storageGroupNum; i++) {
         String storageGroup = sg + i;
         TSetStorageGroupReq setStorageGroupReq =
             new TSetStorageGroupReq(new TStorageGroupSchema(storageGroup));
         TSStatus status = client.setStorageGroup(setStorageGroupReq);
-        Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+        assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
         for (int j = 0; j < seriesPartitionSlotsNum; j++) {
           TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
@@ -146,14 +156,14 @@ public class IoTDBConfigNodeSnapshotIT {
           TSchemaPartitionTableResp schemaPartitionTableResp =
               client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
           // All requests should success if snapshot success
-          Assert.assertEquals(
+          assertEquals(
               TSStatusCode.SUCCESS_STATUS.getStatusCode(),
               schemaPartitionTableResp.getStatus().getCode());
           Assert.assertNotNull(schemaPartitionTableResp.getSchemaPartitionTable());
-          Assert.assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize());
+          assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize());
           Assert.assertNotNull(
               schemaPartitionTableResp.getSchemaPartitionTable().get(storageGroup));
-          Assert.assertEquals(
+          assertEquals(
               1, schemaPartitionTableResp.getSchemaPartitionTable().get(storageGroup).size());
 
           for (int k = 0; k < timePartitionSlotsNum; k++) {
@@ -171,20 +181,20 @@ public class IoTDBConfigNodeSnapshotIT {
             TDataPartitionTableResp dataPartitionTableResp =
                 client.getOrCreateDataPartitionTable(dataPartitionReq);
             // All requests should success if snapshot success
-            Assert.assertEquals(
+            assertEquals(
                 TSStatusCode.SUCCESS_STATUS.getStatusCode(),
                 dataPartitionTableResp.getStatus().getCode());
             Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
-            Assert.assertEquals(1, dataPartitionTableResp.getDataPartitionTableSize());
+            assertEquals(1, dataPartitionTableResp.getDataPartitionTableSize());
             Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable().get(storageGroup));
-            Assert.assertEquals(
+            assertEquals(
                 1, dataPartitionTableResp.getDataPartitionTable().get(storageGroup).size());
             Assert.assertNotNull(
                 dataPartitionTableResp
                     .getDataPartitionTable()
                     .get(storageGroup)
                     .get(seriesPartitionSlot));
-            Assert.assertEquals(
+            assertEquals(
                 1,
                 dataPartitionTableResp
                     .getDataPartitionTable()
@@ -196,6 +206,10 @@ public class IoTDBConfigNodeSnapshotIT {
       }
 
       assertTriggerInformation(createTriggerReqs, client.getTriggerTable());
+
+      TShowCQResp showCQResp = client.showCQ();
+      assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), showCQResp.getStatus().getCode());
+      assertEquals(expectedCQEntries, new HashSet<>(showCQResp.cqList));
     }
   }
 
@@ -242,10 +256,10 @@ public class IoTDBConfigNodeSnapshotIT {
             .setJarMD5(jarMD5)
             .setJarFile(jarFile);
 
-    Assert.assertEquals(
+    assertEquals(
         client.createTrigger(createTriggerReq1).getCode(),
         TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    Assert.assertEquals(
+    assertEquals(
         client.createTrigger(createTriggerReq2).getCode(),
         TSStatusCode.SUCCESS_STATUS.getStatusCode());
 
@@ -261,19 +275,53 @@ public class IoTDBConfigNodeSnapshotIT {
       TriggerInformation triggerInformation =
           TriggerInformation.deserialize(resp.getAllTriggerInformation().get(i));
 
-      Assert.assertEquals(createTriggerReq.getTriggerName(), triggerInformation.getTriggerName());
-      Assert.assertEquals(createTriggerReq.getClassName(), triggerInformation.getClassName());
-      Assert.assertEquals(createTriggerReq.getJarPath(), triggerInformation.getJarName());
-      Assert.assertEquals(
-          createTriggerReq.getTriggerEvent(), triggerInformation.getEvent().getId());
-      Assert.assertEquals(
+      assertEquals(createTriggerReq.getTriggerName(), triggerInformation.getTriggerName());
+      assertEquals(createTriggerReq.getClassName(), triggerInformation.getClassName());
+      assertEquals(createTriggerReq.getJarPath(), triggerInformation.getJarName());
+      assertEquals(createTriggerReq.getTriggerEvent(), triggerInformation.getEvent().getId());
+      assertEquals(
           createTriggerReq.getTriggerType(),
           triggerInformation.isStateful()
               ? TriggerType.STATEFUL.getId()
               : TriggerType.STATELESS.getId());
-      Assert.assertEquals(
+      assertEquals(
           PathDeserializeUtil.deserialize(ByteBuffer.wrap(createTriggerReq.getPathPattern())),
           triggerInformation.getPathPattern());
     }
   }
+
+  private Set<TCQEntry> createCQs(SyncConfigNodeIServiceClient client) throws TException {
+    String sql1 = "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END";
+    String sql2 = "create cq testCq1 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END";
+    TCreateCQReq req1 =
+        new TCreateCQReq(
+            "testCq1",
+            1000,
+            0,
+            1000,
+            0,
+            (byte) 0,
+            "select s1 into root.backup.d1.s1 from root.sg.d1",
+            sql1,
+            "Asia");
+    TCreateCQReq req2 =
+        new TCreateCQReq(
+            "testCq2",
+            1000,
+            0,
+            1000,
+            0,
+            (byte) 1,
+            "select s1 into root.backup.d2.s1 from root.sg.d2",
+            sql2,
+            "Asia");
+
+    assertEquals(client.createCQ(req1).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    assertEquals(client.createCQ(req2).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+    Set<TCQEntry> result = new HashSet<>();
+    result.add(new TCQEntry("testCq1", sql1, CQState.ACTIVE.getType()));
+    result.add(new TCQEntry("testCq2", sql2, CQState.ACTIVE.getType()));
+    return result;
+  }
 }