You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/27 04:24:12 UTC

[iotdb] branch IOTDB-4619 updated: Fix CI by adding username

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
     new ad322e8b42 Fix CI by adding username
ad322e8b42 is described below

commit ad322e8b42e4f8d6b5b554038fefe34c3e836523
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 27 12:23:32 2022 +0800

    Fix CI by adding username
---
 .../confignode/manager/cq/CQScheduleTask.java      |   9 +-
 .../iotdb/confignode/persistence/cq/CQInfo.java    |  18 ++++
 .../request/ConfigPhysicalPlanSerDeTest.java       |   4 +-
 .../iotdb/confignode/persistence/CQInfoTest.java   |   6 +-
 docs/UserGuide/Process-Data/Continuous-Query.md    |   8 +-
 docs/zh/UserGuide/Process-Data/Continuous-Query.md |   8 +-
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |   6 +-
 .../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java   |  69 +++++++------
 .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java  | 114 +--------------------
 .../iotdb/db/mpp/common/MPPQueryContext.java       |   4 +
 .../mpp/plan/execution/config/ConfigExecution.java |   5 +-
 .../plan/execution/config/ConfigTaskVisitor.java   |  19 +++-
 .../config/executor/ClusterConfigTaskExecutor.java |   6 +-
 .../config/executor/IConfigTaskExecutor.java       |   2 +-
 .../executor/StandaloneConfigTaskExecutor.java     |   2 +-
 .../config/metadata/CreateContinuousQueryTask.java |  11 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  24 +++--
 .../src/main/thrift/confignode.thrift              |   1 +
 thrift/src/main/thrift/datanode.thrift             |   1 +
 19 files changed, 144 insertions(+), 173 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index fdee430730..f60bc09080 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -54,6 +54,9 @@ public class CQScheduleTask implements Runnable {
   private final String md5;
 
   private final String zoneId;
+
+  private final String username;
+
   private final ScheduledExecutorService executor;
 
   private final ConfigManager configManager;
@@ -77,6 +80,7 @@ public class CQScheduleTask implements Runnable {
         req.queryBody,
         md5,
         req.zoneId,
+        req.username,
         executor,
         configManager,
         firstExecutionTime);
@@ -93,6 +97,7 @@ public class CQScheduleTask implements Runnable {
         entry.getQueryBody(),
         entry.getMd5(),
         entry.getZoneId(),
+        entry.getUsername(),
         executor,
         configManager,
         entry.getLastExecutionTime() + entry.getEveryInterval());
@@ -107,6 +112,7 @@ public class CQScheduleTask implements Runnable {
       String queryBody,
       String md5,
       String zoneId,
+      String username,
       ScheduledExecutorService executor,
       ConfigManager configManager,
       long executionTime) {
@@ -118,6 +124,7 @@ public class CQScheduleTask implements Runnable {
     this.queryBody = queryBody;
     this.md5 = md5;
     this.zoneId = zoneId;
+    this.username = username;
     this.executor = executor;
     this.configManager = configManager;
     this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval);
@@ -160,7 +167,7 @@ public class CQScheduleTask implements Runnable {
           endTime,
           System.currentTimeMillis());
       TExecuteCQ executeCQReq =
-          new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId);
+          new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId, username);
       try {
         AsyncDataNodeInternalServiceClient client =
             AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index e0db60e859..e90c7167d0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -114,12 +114,15 @@ public class CQInfo implements SnapshotProcessor {
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format("CQ %s doesn't exist.", cqId);
+        LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId);
       } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format("MD5 of CQ %s doesn't match", cqId);
+        LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId);
       } else {
         cqMap.remove(cqId);
         res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
+        LOGGER.info("Drop CQ {} successfully.", cqId);
       }
       return res;
     } finally {
@@ -291,6 +294,8 @@ public class CQInfo implements SnapshotProcessor {
 
     private final String zoneId;
 
+    private final String username;
+
     private CQState state;
     private long lastExecutionTime;
 
@@ -306,6 +311,7 @@ public class CQInfo implements SnapshotProcessor {
           req.sql,
           md5,
           req.zoneId,
+          req.username,
           CQState.INACTIVE,
           lastExecutionTime);
     }
@@ -322,6 +328,7 @@ public class CQInfo implements SnapshotProcessor {
           other.sql,
           other.md5,
           other.zoneId,
+          other.username,
           other.state,
           other.lastExecutionTime);
     }
@@ -337,6 +344,7 @@ public class CQInfo implements SnapshotProcessor {
         String sql,
         String md5,
         String zoneId,
+        String username,
         CQState state,
         long lastExecutionTime) {
       this.cqId = cqId;
@@ -349,6 +357,7 @@ public class CQInfo implements SnapshotProcessor {
       this.sql = sql;
       this.md5 = md5;
       this.zoneId = zoneId;
+      this.username = username;
       this.state = state;
       this.lastExecutionTime = lastExecutionTime;
     }
@@ -364,6 +373,7 @@ public class CQInfo implements SnapshotProcessor {
       ReadWriteIOUtils.write(sql, stream);
       ReadWriteIOUtils.write(md5, stream);
       ReadWriteIOUtils.write(zoneId, stream);
+      ReadWriteIOUtils.write(username, stream);
       ReadWriteIOUtils.write(state.getType(), stream);
       ReadWriteIOUtils.write(lastExecutionTime, stream);
     }
@@ -379,6 +389,7 @@ public class CQInfo implements SnapshotProcessor {
       String sql = ReadWriteIOUtils.readString(stream);
       String md5 = ReadWriteIOUtils.readString(stream);
       String zoneId = ReadWriteIOUtils.readString(stream);
+      String username = ReadWriteIOUtils.readString(stream);
       CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
       long lastExecutionTime = ReadWriteIOUtils.readLong(stream);
       return new CQEntry(
@@ -392,6 +403,7 @@ public class CQInfo implements SnapshotProcessor {
           sql,
           md5,
           zoneId,
+          username,
           state,
           lastExecutionTime);
     }
@@ -444,6 +456,10 @@ public class CQInfo implements SnapshotProcessor {
       return zoneId;
     }
 
+    public String getUsername() {
+      return username;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) return true;
@@ -460,6 +476,7 @@ public class CQInfo implements SnapshotProcessor {
           && Objects.equals(sql, cqEntry.sql)
           && Objects.equals(md5, cqEntry.md5)
           && Objects.equals(zoneId, cqEntry.zoneId)
+          && Objects.equals(username, cqEntry.username)
           && state == cqEntry.state;
     }
 
@@ -476,6 +493,7 @@ public class CQInfo implements SnapshotProcessor {
           sql,
           md5,
           zoneId,
+          username,
           state,
           lastExecutionTime);
     }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index fa45da1f5d..544f0b4895 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1056,7 +1056,8 @@ public class ConfigPhysicalPlanSerDeTest {
                 (byte) 0,
                 "select s1 into root.backup.d1.s1 from root.sg.d1",
                 "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END",
-                "Asia"),
+                "Asia",
+                "root"),
             "testCq1_md5",
             executionTime);
     AddCQPlan addCQPlan1 =
@@ -1098,6 +1099,7 @@ public class ConfigPhysicalPlanSerDeTest {
     Assert.assertEquals(updateCQLastExecTimePlan0, updateCQLastExecTimePlan1);
   }
 
+  @Test
   public void GetTriggerJarPlanTest() throws IOException {
     List<String> jarNames = new ArrayList<>();
     jarNames.add("test1");
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index 9d9b6105c8..9780e4fefe 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -68,7 +68,8 @@ public class CQInfoTest {
                 (byte) 0,
                 "select s1 into root.backup.d1.s1 from root.sg.d1",
                 "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END",
-                "Asia"),
+                "Asia",
+                "root"),
             "testCq1_md5",
             executionTime);
 
@@ -86,7 +87,8 @@ public class CQInfoTest {
                 (byte) 1,
                 "select s1 into root.backup.d2.s1 from root.sg.d2",
                 "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END",
-                "Asia"),
+                "Asia",
+                "root"),
             "testCq2_md5",
             executionTime);
     cqInfo.addCQ(addCQPlan);
diff --git a/docs/UserGuide/Process-Data/Continuous-Query.md b/docs/UserGuide/Process-Data/Continuous-Query.md
index b7f727bb0c..b7a73514aa 100644
--- a/docs/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/UserGuide/Process-Data/Continuous-Query.md
@@ -125,7 +125,7 @@ BEGIN
 SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
 END
 ```
 
@@ -182,7 +182,7 @@ BEGIN
   SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
 END
 ```
 
@@ -257,7 +257,7 @@ BEGIN
   SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
   FILL(100.0)
 END
 ```
@@ -321,7 +321,7 @@ BEGIN
   SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
   FILL(100.0)
 END
 ```
diff --git a/docs/zh/UserGuide/Process-Data/Continuous-Query.md b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
index 4527776535..ec18957052 100644
--- a/docs/zh/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
@@ -126,7 +126,7 @@ BEGIN
   SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
 END
 ```
 
@@ -184,7 +184,7 @@ BEGIN
   SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
 END
 ```
 
@@ -259,7 +259,7 @@ BEGIN
   SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
   FILL(100.0)
 END
 ```
@@ -324,7 +324,7 @@ BEGIN
   SELECT max_value(temperature)
   INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
-  GROUP BY time(10s)
+  GROUP BY(10s)
   FILL(100.0)
 END
 ```
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index c896d4379b..c822736463 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -302,7 +302,8 @@ public class IoTDBConfigNodeSnapshotIT {
             (byte) 0,
             "select s1 into root.backup.d1(s1) from root.sg.d1",
             sql1,
-            "Asia");
+            "Asia",
+            "root");
     TCreateCQReq req2 =
         new TCreateCQReq(
             "testCq2",
@@ -313,7 +314,8 @@ public class IoTDBConfigNodeSnapshotIT {
             (byte) 1,
             "select s1 into root.backup.d2(s1) from root.sg.d2",
             sql2,
-            "Asia");
+            "Asia",
+            "root");
 
     assertEquals(client.createCQ(req1).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
     assertEquals(client.createCQ(req2).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
index 48d1b404ff..7aaecba327 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
@@ -96,7 +96,7 @@ public class IoTDBCQExecIT {
               + "  SELECT max_value(s1) \n"
               + "  INTO root.sg.d1(s1_max)\n"
               + "  FROM root.sg.d1\n"
-              + "  GROUP BY time(1s) \n"
+              + "  GROUP BY(1s) \n"
               + "END");
 
       long targetTime = firstExecutionTime + 5_000;
@@ -118,10 +118,10 @@ public class IoTDBCQExecIT {
           cnt++;
         }
         assertEquals(expectedTime.length, cnt);
+      } finally {
+        statement.execute("DROP CQ cq1");
       }
 
-      statement.execute("DROP CQ cq1");
-
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -136,7 +136,7 @@ public class IoTDBCQExecIT {
         Statement statement = connection.createStatement()) {
       long now = System.currentTimeMillis();
       long firstExecutionTime = now + 10_000;
-      long startTime = firstExecutionTime - 4_000;
+      long startTime = firstExecutionTime - 3_000;
 
       statement.execute("create timeseries root.sg.d2.s1 WITH DATATYPE=INT64");
       statement.execute("create timeseries root.sg.d2.s1_max WITH DATATYPE=INT64");
@@ -174,7 +174,7 @@ public class IoTDBCQExecIT {
               + "  SELECT max_value(s1) \n"
               + "  INTO root.sg.d2(s1_max)\n"
               + "  FROM root.sg.d2\n"
-              + "  GROUP BY time(1s) \n"
+              + "  GROUP BY(1s) \n"
               + "END");
 
       long targetTime = firstExecutionTime + 5_000;
@@ -196,10 +196,10 @@ public class IoTDBCQExecIT {
           cnt++;
         }
         assertEquals(expectedTime.length, cnt);
+      } finally {
+        statement.execute("DROP CQ cq2");
       }
 
-      statement.execute("DROP CQ cq2");
-
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -214,7 +214,7 @@ public class IoTDBCQExecIT {
         Statement statement = connection.createStatement()) {
       long now = System.currentTimeMillis();
       long firstExecutionTime = now + 10_000;
-      long startTime = firstExecutionTime - 4_000;
+      long startTime = firstExecutionTime - 3_000;
 
       statement.execute("create timeseries root.sg.d3.s1 WITH DATATYPE=INT64");
       statement.execute("create timeseries root.sg.d3.s1_max WITH DATATYPE=INT64");
@@ -245,14 +245,14 @@ public class IoTDBCQExecIT {
 
       statement.execute(
           "CREATE CONTINUOUS QUERY cq3\n"
-              + "RESAMPLE EVERY 20s\n"
+              + "RESAMPLE EVERY 2s\n"
               + String.format("BOUNDARY %d\n", firstExecutionTime)
-              + "RANGE 40s\n"
+              + "RANGE 4s\n"
               + "BEGIN \n"
               + "  SELECT max_value(s1) \n"
               + "  INTO root.sg.d3(s1_max)\n"
               + "  FROM root.sg.d3\n"
-              + "  GROUP BY time(1s) \n"
+              + "  GROUP BY(1s) \n"
               + "  FILL(100)\n"
               + "END");
 
@@ -272,7 +272,12 @@ public class IoTDBCQExecIT {
       };
       long[] expectedValue = {100, 2, 4, 6, 8, 10};
 
-      try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d3")) {
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select s1_max from root.sg.d3 where time between "
+                  + (startTime - 1_000)
+                  + " and "
+                  + (startTime + 4_000))) {
         int cnt = 0;
         while (resultSet.next()) {
           assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
@@ -280,10 +285,10 @@ public class IoTDBCQExecIT {
           cnt++;
         }
         assertEquals(expectedTime.length, cnt);
+      } finally {
+        statement.execute("DROP CQ cq3");
       }
 
-      statement.execute("DROP CQ cq3");
-
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -298,7 +303,7 @@ public class IoTDBCQExecIT {
         Statement statement = connection.createStatement()) {
       long now = System.currentTimeMillis();
       long firstExecutionTime = now + 10_000;
-      long startTime = firstExecutionTime - 4_000;
+      long startTime = firstExecutionTime - 3_000;
 
       statement.execute("create timeseries root.sg.d4.s1 WITH DATATYPE=INT64");
       statement.execute("create timeseries root.sg.d4.s1_max WITH DATATYPE=INT64");
@@ -329,14 +334,14 @@ public class IoTDBCQExecIT {
 
       statement.execute(
           "CREATE CONTINUOUS QUERY cq4\n"
-              + "RESAMPLE EVERY 20s\n"
+              + "RESAMPLE EVERY 2s\n"
               + String.format("BOUNDARY %d\n", firstExecutionTime)
-              + "RANGE 20s, 10s\n"
+              + "RANGE 2s, 1s\n"
               + "BEGIN \n"
               + "  SELECT max_value(s1) \n"
               + "  INTO root.sg.d4(s1_max)\n"
               + "  FROM root.sg.d4\n"
-              + "  GROUP BY time(1s) \n"
+              + "  GROUP BY(1s) \n"
               + "END");
 
       long targetTime = firstExecutionTime + 5_000;
@@ -345,8 +350,8 @@ public class IoTDBCQExecIT {
         TimeUnit.SECONDS.sleep(1);
       }
 
-      long[] expectedTime = {startTime + 2_000, startTime + 4_000};
-      long[] expectedValue = {6, 10};
+      long[] expectedTime = {startTime + 1_000, startTime + 3_000};
+      long[] expectedValue = {4, 8};
 
       try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d4")) {
         int cnt = 0;
@@ -356,10 +361,10 @@ public class IoTDBCQExecIT {
           cnt++;
         }
         assertEquals(expectedTime.length, cnt);
+      } finally {
+        statement.execute("DROP CQ cq4");
       }
 
-      statement.execute("DROP CQ cq4");
-
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -374,10 +379,10 @@ public class IoTDBCQExecIT {
         Statement statement = connection.createStatement()) {
       long now = System.currentTimeMillis();
       long firstExecutionTime = now + 10_000;
-      long startTime = firstExecutionTime - 4_000;
+      long startTime = firstExecutionTime - 3_000;
 
       statement.execute("create timeseries root.sg.d5.s1 WITH DATATYPE=INT64");
-      statement.execute("create timeseries root.sg.d5.precalculated_s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d5.precalculated_s1 WITH DATATYPE=DOUBLE");
 
       statement.execute(
           String.format(
@@ -407,6 +412,7 @@ public class IoTDBCQExecIT {
           "CREATE CONTINUOUS QUERY cq5\n"
               + "RESAMPLE EVERY 2s\n"
               + String.format("BOUNDARY %d\n", firstExecutionTime)
+              + "RANGE 4s\n"
               + "BEGIN \n"
               + "  SELECT s1 + 1 \n"
               + "  INTO root.sg.d5(precalculated_s1)\n"
@@ -432,21 +438,26 @@ public class IoTDBCQExecIT {
         startTime + 4_000,
         startTime + 4_500
       };
-      long[] expectedValue = {2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
+      double[] expectedValue = {2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0};
 
       try (ResultSet resultSet =
-          statement.executeQuery("select precalculated_s1 from root.sg.d5")) {
+          statement.executeQuery(
+              "select precalculated_s1 from root.sg.d5 where time between "
+                  + startTime
+                  + " and "
+                  + (startTime + 4_500))) {
         int cnt = 0;
         while (resultSet.next()) {
           assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
-          assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d5.precalculated_s1"));
+          assertEquals(
+              expectedValue[cnt], resultSet.getDouble("root.sg.d5.precalculated_s1"), 0.00001);
           cnt++;
         }
         assertEquals(expectedTime.length, cnt);
+      } finally {
+        statement.execute("DROP CQ cq5");
       }
 
-      statement.execute("DROP CQ cq5");
-
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
index 7910ffeae5..84aff4a58a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
@@ -430,60 +430,6 @@ public class IoTDBCQIT {
             + "    GROUP BY(10m)\n"
             + "END"
       };
-      String[] formattedCqSQLs = {
-        "CREATE CQ show_cq_1\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 1800000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms, 600000ms\n"
-            + "TIMEOUT POLICY BLOCKED\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (1800000ms)\n"
-            + "END\n"
-            + ";",
-        "CREATE CQ show_cq_2\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 1800000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms\n"
-            + "TIMEOUT POLICY BLOCKED\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (1800000ms)\n"
-            + "END\n"
-            + ";",
-        "CREATE CQ show_cq_3\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 600000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms\n"
-            + "TIMEOUT POLICY DISCARD\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (600000ms)\n"
-            + "END\n"
-            + ";",
-        "CREATE CQ show_cq_4\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 1800000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms\n"
-            + "TIMEOUT POLICY DISCARD\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (600000ms)\n"
-            + "END\n"
-            + ";"
-      };
 
       for (String sql : cqSQLs) {
         statement.execute(sql);
@@ -495,7 +441,7 @@ public class IoTDBCQIT {
         while (resultSet.next()) {
           // No need to add time column for aggregation query
           assertEquals(cqIds[cnt], resultSet.getString(1));
-          assertEquals(formattedCqSQLs[cnt], resultSet.getString(2));
+          assertEquals(cqSQLs[cnt], resultSet.getString(2));
           assertEquals("ACTIVE", resultSet.getString(3));
           cnt++;
         }
@@ -558,60 +504,6 @@ public class IoTDBCQIT {
             + "    GROUP BY(10m)\n"
             + "END"
       };
-      String[] formattedCqSQLs = {
-        "CREATE CQ drop_cq_1\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 1800000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms, 600000ms\n"
-            + "TIMEOUT POLICY BLOCKED\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (1800000ms)\n"
-            + "END\n"
-            + ";",
-        "CREATE CQ drop_cq_2\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 1800000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms\n"
-            + "TIMEOUT POLICY BLOCKED\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (1800000ms)\n"
-            + "END\n"
-            + ";",
-        "CREATE CQ drop_cq_3\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 600000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms\n"
-            + "TIMEOUT POLICY DISCARD\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (600000ms)\n"
-            + "END\n"
-            + ";",
-        "CREATE CQ drop_cq_4\n"
-            + "RESAMPLE\n"
-            + "\tEVERY 1800000ms\n"
-            + "\tBOUNDARY 0\n"
-            + "\tRANGE 1800000ms\n"
-            + "TIMEOUT POLICY DISCARD\n"
-            + "BEGIN\n"
-            + "\tSELECT count(s1)\n"
-            + "\t\tINTO root.sg_count.d(count_s1)\n"
-            + "\t\tFROM root.sg.d\n"
-            + "\t\tGROUP BY TIME (600000ms)\n"
-            + "END\n"
-            + ";"
-      };
 
       for (String sql : cqSQLs) {
         statement.execute(sql);
@@ -623,7 +515,7 @@ public class IoTDBCQIT {
         while (resultSet.next()) {
           // No need to add time column for aggregation query
           assertEquals(cqIds[cnt], resultSet.getString(1));
-          assertEquals(formattedCqSQLs[cnt], resultSet.getString(2));
+          assertEquals(cqSQLs[cnt], resultSet.getString(2));
           assertEquals("ACTIVE", resultSet.getString(3));
           cnt++;
         }
@@ -641,7 +533,7 @@ public class IoTDBCQIT {
         while (resultSet.next()) {
           // No need to add time column for aggregation query
           assertEquals(cqIds[resultIndex[cnt]], resultSet.getString(1));
-          assertEquals(formattedCqSQLs[resultIndex[cnt]], resultSet.getString(2));
+          assertEquals(cqSQLs[resultIndex[cnt]], resultSet.getString(2));
           assertEquals("ACTIVE", resultSet.getString(3));
           cnt++;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 57e201ecd7..5fecb985ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -144,4 +144,8 @@ public class MPPQueryContext {
   public TypeProvider getTypeProvider() {
     return typeProvider;
   }
+
+  public String getSql() {
+    return sql;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index c73ec144a1..4d97d9dd2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -75,7 +75,10 @@ public class ConfigExecution implements IQueryExecution {
     this.task =
         statement.accept(
             new ConfigTaskVisitor(),
-            new ConfigTaskVisitor.TaskContext(context.getQueryId().getId()));
+            new ConfigTaskVisitor.TaskContext(
+                context.getQueryId().getId(),
+                context.getSql(),
+                context.getSession().getUserName()));
     this.resultSetConsumed = false;
     if (config.isClusterMode()) {
       configTaskExecutor = ClusterConfigTaskExecutor.getInstance();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index c97ea6758d..8aaea62cc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -361,7 +361,8 @@ public class ConfigTaskVisitor
   @Override
   public IConfigTask visitCreateContinuousQuery(
       CreateContinuousQueryStatement createContinuousQueryStatement, TaskContext context) {
-    return new CreateContinuousQueryTask(createContinuousQueryStatement);
+    return new CreateContinuousQueryTask(
+        createContinuousQueryStatement, context.sql, context.username);
   }
 
   @Override
@@ -380,12 +381,26 @@ public class ConfigTaskVisitor
 
     private final String queryId;
 
-    public TaskContext(String queryId) {
+    private final String sql;
+
+    private final String username;
+
+    public TaskContext(String queryId, String sql, String username) {
       this.queryId = queryId;
+      this.sql = sql;
+      this.username = username;
     }
 
     public String getQueryId() {
       return queryId;
     }
+
+    public String getSql() {
+      return sql;
+    }
+
+    public String getUsername() {
+      return username;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index de49ae98ea..a85a060b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1275,11 +1275,10 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
 
   @Override
   public SettableFuture<ConfigTaskResult> createContinuousQuery(
-      CreateContinuousQueryStatement createContinuousQueryStatement) {
+      CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
     createContinuousQueryStatement.semanticCheck();
 
     String queryBody = createContinuousQueryStatement.getQueryBody();
-    String sql = createContinuousQueryStatement.getSql();
     // TODO Do not modify Statement in Analyzer
     Analyzer.validate(createContinuousQueryStatement.getQueryBodyStatement());
 
@@ -1296,7 +1295,8 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
               createContinuousQueryStatement.getTimeoutPolicy().getType(),
               queryBody,
               sql,
-              createContinuousQueryStatement.getZoneId());
+              createContinuousQueryStatement.getZoneId(),
+              username);
       final TSStatus executionStatus = client.createCQ(tCreateCQReq);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
         LOGGER.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 9c2f5e07d1..9ad0a4aca2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -148,7 +148,7 @@ public interface IConfigTaskExecutor {
       GetTimeSlotListStatement getTimeSlotListStatement);
 
   SettableFuture<ConfigTaskResult> createContinuousQuery(
-      CreateContinuousQueryStatement createContinuousQueryStatement);
+      CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username);
 
   SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index bc1aad48bf..726e368558 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -646,7 +646,7 @@ public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor {
 
   @Override
   public SettableFuture<ConfigTaskResult> createContinuousQuery(
-      CreateContinuousQueryStatement createContinuousQueryStatement) {
+      CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try {
       // todo: implementation
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
index 060162e731..c1d9859cc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
@@ -30,13 +30,20 @@ public class CreateContinuousQueryTask implements IConfigTask {
 
   private final CreateContinuousQueryStatement createContinuousQueryStatement;
 
-  public CreateContinuousQueryTask(CreateContinuousQueryStatement createContinuousQueryStatement) {
+  private final String sql;
+
+  private final String username;
+
+  public CreateContinuousQueryTask(
+      CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
     this.createContinuousQueryStatement = createContinuousQueryStatement;
+    this.sql = sql;
+    this.username = username;
   }
 
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.createContinuousQuery(createContinuousQueryStatement);
+    return configTaskExecutor.createContinuousQuery(createContinuousQueryStatement, sql, username);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 77ed3398ff..7f7bbf47d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -84,10 +84,11 @@ import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.GreaterEqualExpression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.LessThanExpression;
 import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
-import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -714,7 +715,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) throws TException {
+  public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) {
     try {
       PipeInfo pipeInfo = PipeInfo.deserializePipeInfo(req.pipeInfo);
       SyncService.getInstance().addPipe(pipeInfo);
@@ -725,7 +726,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req) throws TException {
+  public TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req) {
     try {
       switch (SyncOperation.values()[req.getOperation()]) {
         case START_PIPE:
@@ -748,10 +749,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus executeCQ(TExecuteCQ req) throws TException {
+  public TSStatus executeCQ(TExecuteCQ req) {
 
     long sessionId =
-        SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
+        SESSION_MANAGER.requestSessionId(
+            req.username, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
     String executedSQL = req.queryBody;
 
     try {
@@ -766,10 +768,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
       // 1. add time filter in where
       Expression timeFilter =
-          new BetweenExpression(
-              new TimestampOperand(),
-              new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
-              new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
+          new LogicAndExpression(
+              new GreaterEqualExpression(
+                  new TimestampOperand(),
+                  new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime))),
+              new LessThanExpression(
+                  new TimestampOperand(),
+                  new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime))));
       if (s.getWhereCondition() != null) {
         s.getWhereCondition()
             .setPredicate(new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
@@ -781,6 +786,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       if (s.getGroupByTimeComponent() != null) {
         s.getGroupByTimeComponent().setStartTime(req.startTime);
         s.getGroupByTimeComponent().setEndTime(req.endTime);
+        s.getGroupByTimeComponent().setLeftCRightO(true);
       }
       executedSQL = String.join(" ", s.constructFormattedSQL().split("\n")).replaceAll(" +", " ");
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 40d1043bd0..8f36f49994 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -568,6 +568,7 @@ struct TCreateCQReq {
   7: required string queryBody
   8: required string sql
   9: required string zoneId
+  10: required string username
 }
 
 struct TDropCQReq {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 7b8723ff32..6e33ef80b4 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -331,6 +331,7 @@ struct TExecuteCQ {
   4: required i64 timeout
   5: required string zoneId
   6: required string cqId
+  7: required string username
 }
 
 service IDataNodeRPCService {