You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/27 04:24:12 UTC
[iotdb] branch IOTDB-4619 updated: Fix CI by adding username
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
new ad322e8b42 Fix CI by adding username
ad322e8b42 is described below
commit ad322e8b42e4f8d6b5b554038fefe34c3e836523
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 27 12:23:32 2022 +0800
Fix CI by adding username
---
.../confignode/manager/cq/CQScheduleTask.java | 9 +-
.../iotdb/confignode/persistence/cq/CQInfo.java | 18 ++++
.../request/ConfigPhysicalPlanSerDeTest.java | 4 +-
.../iotdb/confignode/persistence/CQInfoTest.java | 6 +-
docs/UserGuide/Process-Data/Continuous-Query.md | 8 +-
docs/zh/UserGuide/Process-Data/Continuous-Query.md | 8 +-
.../confignode/it/IoTDBConfigNodeSnapshotIT.java | 6 +-
.../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java | 69 +++++++------
.../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java | 114 +--------------------
.../iotdb/db/mpp/common/MPPQueryContext.java | 4 +
.../mpp/plan/execution/config/ConfigExecution.java | 5 +-
.../plan/execution/config/ConfigTaskVisitor.java | 19 +++-
.../config/executor/ClusterConfigTaskExecutor.java | 6 +-
.../config/executor/IConfigTaskExecutor.java | 2 +-
.../executor/StandaloneConfigTaskExecutor.java | 2 +-
.../config/metadata/CreateContinuousQueryTask.java | 11 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 24 +++--
.../src/main/thrift/confignode.thrift | 1 +
thrift/src/main/thrift/datanode.thrift | 1 +
19 files changed, 144 insertions(+), 173 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index fdee430730..f60bc09080 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -54,6 +54,9 @@ public class CQScheduleTask implements Runnable {
private final String md5;
private final String zoneId;
+
+ private final String username;
+
private final ScheduledExecutorService executor;
private final ConfigManager configManager;
@@ -77,6 +80,7 @@ public class CQScheduleTask implements Runnable {
req.queryBody,
md5,
req.zoneId,
+ req.username,
executor,
configManager,
firstExecutionTime);
@@ -93,6 +97,7 @@ public class CQScheduleTask implements Runnable {
entry.getQueryBody(),
entry.getMd5(),
entry.getZoneId(),
+ entry.getUsername(),
executor,
configManager,
entry.getLastExecutionTime() + entry.getEveryInterval());
@@ -107,6 +112,7 @@ public class CQScheduleTask implements Runnable {
String queryBody,
String md5,
String zoneId,
+ String username,
ScheduledExecutorService executor,
ConfigManager configManager,
long executionTime) {
@@ -118,6 +124,7 @@ public class CQScheduleTask implements Runnable {
this.queryBody = queryBody;
this.md5 = md5;
this.zoneId = zoneId;
+ this.username = username;
this.executor = executor;
this.configManager = configManager;
this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval);
@@ -160,7 +167,7 @@ public class CQScheduleTask implements Runnable {
endTime,
System.currentTimeMillis());
TExecuteCQ executeCQReq =
- new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId);
+ new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId, username);
try {
AsyncDataNodeInternalServiceClient client =
AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index e0db60e859..e90c7167d0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -114,12 +114,15 @@ public class CQInfo implements SnapshotProcessor {
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format("CQ %s doesn't exist.", cqId);
+ LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId);
} else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format("MD5 of CQ %s doesn't match", cqId);
+ LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId);
} else {
cqMap.remove(cqId);
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ LOGGER.info("Drop CQ {} successfully.", cqId);
}
return res;
} finally {
@@ -291,6 +294,8 @@ public class CQInfo implements SnapshotProcessor {
private final String zoneId;
+ private final String username;
+
private CQState state;
private long lastExecutionTime;
@@ -306,6 +311,7 @@ public class CQInfo implements SnapshotProcessor {
req.sql,
md5,
req.zoneId,
+ req.username,
CQState.INACTIVE,
lastExecutionTime);
}
@@ -322,6 +328,7 @@ public class CQInfo implements SnapshotProcessor {
other.sql,
other.md5,
other.zoneId,
+ other.username,
other.state,
other.lastExecutionTime);
}
@@ -337,6 +344,7 @@ public class CQInfo implements SnapshotProcessor {
String sql,
String md5,
String zoneId,
+ String username,
CQState state,
long lastExecutionTime) {
this.cqId = cqId;
@@ -349,6 +357,7 @@ public class CQInfo implements SnapshotProcessor {
this.sql = sql;
this.md5 = md5;
this.zoneId = zoneId;
+ this.username = username;
this.state = state;
this.lastExecutionTime = lastExecutionTime;
}
@@ -364,6 +373,7 @@ public class CQInfo implements SnapshotProcessor {
ReadWriteIOUtils.write(sql, stream);
ReadWriteIOUtils.write(md5, stream);
ReadWriteIOUtils.write(zoneId, stream);
+ ReadWriteIOUtils.write(username, stream);
ReadWriteIOUtils.write(state.getType(), stream);
ReadWriteIOUtils.write(lastExecutionTime, stream);
}
@@ -379,6 +389,7 @@ public class CQInfo implements SnapshotProcessor {
String sql = ReadWriteIOUtils.readString(stream);
String md5 = ReadWriteIOUtils.readString(stream);
String zoneId = ReadWriteIOUtils.readString(stream);
+ String username = ReadWriteIOUtils.readString(stream);
CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
long lastExecutionTime = ReadWriteIOUtils.readLong(stream);
return new CQEntry(
@@ -392,6 +403,7 @@ public class CQInfo implements SnapshotProcessor {
sql,
md5,
zoneId,
+ username,
state,
lastExecutionTime);
}
@@ -444,6 +456,10 @@ public class CQInfo implements SnapshotProcessor {
return zoneId;
}
+ public String getUsername() {
+ return username;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -460,6 +476,7 @@ public class CQInfo implements SnapshotProcessor {
&& Objects.equals(sql, cqEntry.sql)
&& Objects.equals(md5, cqEntry.md5)
&& Objects.equals(zoneId, cqEntry.zoneId)
+ && Objects.equals(username, cqEntry.username)
&& state == cqEntry.state;
}
@@ -476,6 +493,7 @@ public class CQInfo implements SnapshotProcessor {
sql,
md5,
zoneId,
+ username,
state,
lastExecutionTime);
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index fa45da1f5d..544f0b4895 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1056,7 +1056,8 @@ public class ConfigPhysicalPlanSerDeTest {
(byte) 0,
"select s1 into root.backup.d1.s1 from root.sg.d1",
"create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END",
- "Asia"),
+ "Asia",
+ "root"),
"testCq1_md5",
executionTime);
AddCQPlan addCQPlan1 =
@@ -1098,6 +1099,7 @@ public class ConfigPhysicalPlanSerDeTest {
Assert.assertEquals(updateCQLastExecTimePlan0, updateCQLastExecTimePlan1);
}
+ @Test
public void GetTriggerJarPlanTest() throws IOException {
List<String> jarNames = new ArrayList<>();
jarNames.add("test1");
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index 9d9b6105c8..9780e4fefe 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -68,7 +68,8 @@ public class CQInfoTest {
(byte) 0,
"select s1 into root.backup.d1.s1 from root.sg.d1",
"create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END",
- "Asia"),
+ "Asia",
+ "root"),
"testCq1_md5",
executionTime);
@@ -86,7 +87,8 @@ public class CQInfoTest {
(byte) 1,
"select s1 into root.backup.d2.s1 from root.sg.d2",
"create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END",
- "Asia"),
+ "Asia",
+ "root"),
"testCq2_md5",
executionTime);
cqInfo.addCQ(addCQPlan);
diff --git a/docs/UserGuide/Process-Data/Continuous-Query.md b/docs/UserGuide/Process-Data/Continuous-Query.md
index b7f727bb0c..b7a73514aa 100644
--- a/docs/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/UserGuide/Process-Data/Continuous-Query.md
@@ -125,7 +125,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
END
```
@@ -182,7 +182,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
END
```
@@ -257,7 +257,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
FILL(100.0)
END
```
@@ -321,7 +321,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
FILL(100.0)
END
```
diff --git a/docs/zh/UserGuide/Process-Data/Continuous-Query.md b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
index 4527776535..ec18957052 100644
--- a/docs/zh/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
@@ -126,7 +126,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
END
```
@@ -184,7 +184,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
END
```
@@ -259,7 +259,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
FILL(100.0)
END
```
@@ -324,7 +324,7 @@ BEGIN
SELECT max_value(temperature)
INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
FROM root.ln.*.*
- GROUP BY time(10s)
+ GROUP BY(10s)
FILL(100.0)
END
```
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index c896d4379b..c822736463 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -302,7 +302,8 @@ public class IoTDBConfigNodeSnapshotIT {
(byte) 0,
"select s1 into root.backup.d1(s1) from root.sg.d1",
sql1,
- "Asia");
+ "Asia",
+ "root");
TCreateCQReq req2 =
new TCreateCQReq(
"testCq2",
@@ -313,7 +314,8 @@ public class IoTDBConfigNodeSnapshotIT {
(byte) 1,
"select s1 into root.backup.d2(s1) from root.sg.d2",
sql2,
- "Asia");
+ "Asia",
+ "root");
assertEquals(client.createCQ(req1).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
assertEquals(client.createCQ(req2).getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
index 48d1b404ff..7aaecba327 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
@@ -96,7 +96,7 @@ public class IoTDBCQExecIT {
+ " SELECT max_value(s1) \n"
+ " INTO root.sg.d1(s1_max)\n"
+ " FROM root.sg.d1\n"
- + " GROUP BY time(1s) \n"
+ + " GROUP BY(1s) \n"
+ "END");
long targetTime = firstExecutionTime + 5_000;
@@ -118,10 +118,10 @@ public class IoTDBCQExecIT {
cnt++;
}
assertEquals(expectedTime.length, cnt);
+ } finally {
+ statement.execute("DROP CQ cq1");
}
- statement.execute("DROP CQ cq1");
-
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -136,7 +136,7 @@ public class IoTDBCQExecIT {
Statement statement = connection.createStatement()) {
long now = System.currentTimeMillis();
long firstExecutionTime = now + 10_000;
- long startTime = firstExecutionTime - 4_000;
+ long startTime = firstExecutionTime - 3_000;
statement.execute("create timeseries root.sg.d2.s1 WITH DATATYPE=INT64");
statement.execute("create timeseries root.sg.d2.s1_max WITH DATATYPE=INT64");
@@ -174,7 +174,7 @@ public class IoTDBCQExecIT {
+ " SELECT max_value(s1) \n"
+ " INTO root.sg.d2(s1_max)\n"
+ " FROM root.sg.d2\n"
- + " GROUP BY time(1s) \n"
+ + " GROUP BY(1s) \n"
+ "END");
long targetTime = firstExecutionTime + 5_000;
@@ -196,10 +196,10 @@ public class IoTDBCQExecIT {
cnt++;
}
assertEquals(expectedTime.length, cnt);
+ } finally {
+ statement.execute("DROP CQ cq2");
}
- statement.execute("DROP CQ cq2");
-
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -214,7 +214,7 @@ public class IoTDBCQExecIT {
Statement statement = connection.createStatement()) {
long now = System.currentTimeMillis();
long firstExecutionTime = now + 10_000;
- long startTime = firstExecutionTime - 4_000;
+ long startTime = firstExecutionTime - 3_000;
statement.execute("create timeseries root.sg.d3.s1 WITH DATATYPE=INT64");
statement.execute("create timeseries root.sg.d3.s1_max WITH DATATYPE=INT64");
@@ -245,14 +245,14 @@ public class IoTDBCQExecIT {
statement.execute(
"CREATE CONTINUOUS QUERY cq3\n"
- + "RESAMPLE EVERY 20s\n"
+ + "RESAMPLE EVERY 2s\n"
+ String.format("BOUNDARY %d\n", firstExecutionTime)
- + "RANGE 40s\n"
+ + "RANGE 4s\n"
+ "BEGIN \n"
+ " SELECT max_value(s1) \n"
+ " INTO root.sg.d3(s1_max)\n"
+ " FROM root.sg.d3\n"
- + " GROUP BY time(1s) \n"
+ + " GROUP BY(1s) \n"
+ " FILL(100)\n"
+ "END");
@@ -272,7 +272,12 @@ public class IoTDBCQExecIT {
};
long[] expectedValue = {100, 2, 4, 6, 8, 10};
- try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d3")) {
+ try (ResultSet resultSet =
+ statement.executeQuery(
+ "select s1_max from root.sg.d3 where time between "
+ + (startTime - 1_000)
+ + " and "
+ + (startTime + 4_000))) {
int cnt = 0;
while (resultSet.next()) {
assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
@@ -280,10 +285,10 @@ public class IoTDBCQExecIT {
cnt++;
}
assertEquals(expectedTime.length, cnt);
+ } finally {
+ statement.execute("DROP CQ cq3");
}
- statement.execute("DROP CQ cq3");
-
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -298,7 +303,7 @@ public class IoTDBCQExecIT {
Statement statement = connection.createStatement()) {
long now = System.currentTimeMillis();
long firstExecutionTime = now + 10_000;
- long startTime = firstExecutionTime - 4_000;
+ long startTime = firstExecutionTime - 3_000;
statement.execute("create timeseries root.sg.d4.s1 WITH DATATYPE=INT64");
statement.execute("create timeseries root.sg.d4.s1_max WITH DATATYPE=INT64");
@@ -329,14 +334,14 @@ public class IoTDBCQExecIT {
statement.execute(
"CREATE CONTINUOUS QUERY cq4\n"
- + "RESAMPLE EVERY 20s\n"
+ + "RESAMPLE EVERY 2s\n"
+ String.format("BOUNDARY %d\n", firstExecutionTime)
- + "RANGE 20s, 10s\n"
+ + "RANGE 2s, 1s\n"
+ "BEGIN \n"
+ " SELECT max_value(s1) \n"
+ " INTO root.sg.d4(s1_max)\n"
+ " FROM root.sg.d4\n"
- + " GROUP BY time(1s) \n"
+ + " GROUP BY(1s) \n"
+ "END");
long targetTime = firstExecutionTime + 5_000;
@@ -345,8 +350,8 @@ public class IoTDBCQExecIT {
TimeUnit.SECONDS.sleep(1);
}
- long[] expectedTime = {startTime + 2_000, startTime + 4_000};
- long[] expectedValue = {6, 10};
+ long[] expectedTime = {startTime + 1_000, startTime + 3_000};
+ long[] expectedValue = {4, 8};
try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d4")) {
int cnt = 0;
@@ -356,10 +361,10 @@ public class IoTDBCQExecIT {
cnt++;
}
assertEquals(expectedTime.length, cnt);
+ } finally {
+ statement.execute("DROP CQ cq4");
}
- statement.execute("DROP CQ cq4");
-
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -374,10 +379,10 @@ public class IoTDBCQExecIT {
Statement statement = connection.createStatement()) {
long now = System.currentTimeMillis();
long firstExecutionTime = now + 10_000;
- long startTime = firstExecutionTime - 4_000;
+ long startTime = firstExecutionTime - 3_000;
statement.execute("create timeseries root.sg.d5.s1 WITH DATATYPE=INT64");
- statement.execute("create timeseries root.sg.d5.precalculated_s1 WITH DATATYPE=INT64");
+ statement.execute("create timeseries root.sg.d5.precalculated_s1 WITH DATATYPE=DOUBLE");
statement.execute(
String.format(
@@ -407,6 +412,7 @@ public class IoTDBCQExecIT {
"CREATE CONTINUOUS QUERY cq5\n"
+ "RESAMPLE EVERY 2s\n"
+ String.format("BOUNDARY %d\n", firstExecutionTime)
+ + "RANGE 4s\n"
+ "BEGIN \n"
+ " SELECT s1 + 1 \n"
+ " INTO root.sg.d5(precalculated_s1)\n"
@@ -432,21 +438,26 @@ public class IoTDBCQExecIT {
startTime + 4_000,
startTime + 4_500
};
- long[] expectedValue = {2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
+ double[] expectedValue = {2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0};
try (ResultSet resultSet =
- statement.executeQuery("select precalculated_s1 from root.sg.d5")) {
+ statement.executeQuery(
+ "select precalculated_s1 from root.sg.d5 where time between "
+ + startTime
+ + " and "
+ + (startTime + 4_500))) {
int cnt = 0;
while (resultSet.next()) {
assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
- assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d5.precalculated_s1"));
+ assertEquals(
+ expectedValue[cnt], resultSet.getDouble("root.sg.d5.precalculated_s1"), 0.00001);
cnt++;
}
assertEquals(expectedTime.length, cnt);
+ } finally {
+ statement.execute("DROP CQ cq5");
}
- statement.execute("DROP CQ cq5");
-
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
index 7910ffeae5..84aff4a58a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
@@ -430,60 +430,6 @@ public class IoTDBCQIT {
+ " GROUP BY(10m)\n"
+ "END"
};
- String[] formattedCqSQLs = {
- "CREATE CQ show_cq_1\n"
- + "RESAMPLE\n"
- + "\tEVERY 1800000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms, 600000ms\n"
- + "TIMEOUT POLICY BLOCKED\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (1800000ms)\n"
- + "END\n"
- + ";",
- "CREATE CQ show_cq_2\n"
- + "RESAMPLE\n"
- + "\tEVERY 1800000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms\n"
- + "TIMEOUT POLICY BLOCKED\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (1800000ms)\n"
- + "END\n"
- + ";",
- "CREATE CQ show_cq_3\n"
- + "RESAMPLE\n"
- + "\tEVERY 600000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms\n"
- + "TIMEOUT POLICY DISCARD\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (600000ms)\n"
- + "END\n"
- + ";",
- "CREATE CQ show_cq_4\n"
- + "RESAMPLE\n"
- + "\tEVERY 1800000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms\n"
- + "TIMEOUT POLICY DISCARD\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (600000ms)\n"
- + "END\n"
- + ";"
- };
for (String sql : cqSQLs) {
statement.execute(sql);
@@ -495,7 +441,7 @@ public class IoTDBCQIT {
while (resultSet.next()) {
// No need to add time column for aggregation query
assertEquals(cqIds[cnt], resultSet.getString(1));
- assertEquals(formattedCqSQLs[cnt], resultSet.getString(2));
+ assertEquals(cqSQLs[cnt], resultSet.getString(2));
assertEquals("ACTIVE", resultSet.getString(3));
cnt++;
}
@@ -558,60 +504,6 @@ public class IoTDBCQIT {
+ " GROUP BY(10m)\n"
+ "END"
};
- String[] formattedCqSQLs = {
- "CREATE CQ drop_cq_1\n"
- + "RESAMPLE\n"
- + "\tEVERY 1800000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms, 600000ms\n"
- + "TIMEOUT POLICY BLOCKED\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (1800000ms)\n"
- + "END\n"
- + ";",
- "CREATE CQ drop_cq_2\n"
- + "RESAMPLE\n"
- + "\tEVERY 1800000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms\n"
- + "TIMEOUT POLICY BLOCKED\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (1800000ms)\n"
- + "END\n"
- + ";",
- "CREATE CQ drop_cq_3\n"
- + "RESAMPLE\n"
- + "\tEVERY 600000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms\n"
- + "TIMEOUT POLICY DISCARD\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (600000ms)\n"
- + "END\n"
- + ";",
- "CREATE CQ drop_cq_4\n"
- + "RESAMPLE\n"
- + "\tEVERY 1800000ms\n"
- + "\tBOUNDARY 0\n"
- + "\tRANGE 1800000ms\n"
- + "TIMEOUT POLICY DISCARD\n"
- + "BEGIN\n"
- + "\tSELECT count(s1)\n"
- + "\t\tINTO root.sg_count.d(count_s1)\n"
- + "\t\tFROM root.sg.d\n"
- + "\t\tGROUP BY TIME (600000ms)\n"
- + "END\n"
- + ";"
- };
for (String sql : cqSQLs) {
statement.execute(sql);
@@ -623,7 +515,7 @@ public class IoTDBCQIT {
while (resultSet.next()) {
// No need to add time column for aggregation query
assertEquals(cqIds[cnt], resultSet.getString(1));
- assertEquals(formattedCqSQLs[cnt], resultSet.getString(2));
+ assertEquals(cqSQLs[cnt], resultSet.getString(2));
assertEquals("ACTIVE", resultSet.getString(3));
cnt++;
}
@@ -641,7 +533,7 @@ public class IoTDBCQIT {
while (resultSet.next()) {
// No need to add time column for aggregation query
assertEquals(cqIds[resultIndex[cnt]], resultSet.getString(1));
- assertEquals(formattedCqSQLs[resultIndex[cnt]], resultSet.getString(2));
+ assertEquals(cqSQLs[resultIndex[cnt]], resultSet.getString(2));
assertEquals("ACTIVE", resultSet.getString(3));
cnt++;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 57e201ecd7..5fecb985ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -144,4 +144,8 @@ public class MPPQueryContext {
public TypeProvider getTypeProvider() {
return typeProvider;
}
+
+ public String getSql() {
+ return sql;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index c73ec144a1..4d97d9dd2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -75,7 +75,10 @@ public class ConfigExecution implements IQueryExecution {
this.task =
statement.accept(
new ConfigTaskVisitor(),
- new ConfigTaskVisitor.TaskContext(context.getQueryId().getId()));
+ new ConfigTaskVisitor.TaskContext(
+ context.getQueryId().getId(),
+ context.getSql(),
+ context.getSession().getUserName()));
this.resultSetConsumed = false;
if (config.isClusterMode()) {
configTaskExecutor = ClusterConfigTaskExecutor.getInstance();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index c97ea6758d..8aaea62cc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -361,7 +361,8 @@ public class ConfigTaskVisitor
@Override
public IConfigTask visitCreateContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement, TaskContext context) {
- return new CreateContinuousQueryTask(createContinuousQueryStatement);
+ return new CreateContinuousQueryTask(
+ createContinuousQueryStatement, context.sql, context.username);
}
@Override
@@ -380,12 +381,26 @@ public class ConfigTaskVisitor
private final String queryId;
- public TaskContext(String queryId) {
+ private final String sql;
+
+ private final String username;
+
+ public TaskContext(String queryId, String sql, String username) {
this.queryId = queryId;
+ this.sql = sql;
+ this.username = username;
}
public String getQueryId() {
return queryId;
}
+
+ public String getSql() {
+ return sql;
+ }
+
+ public String getUsername() {
+ return username;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index de49ae98ea..a85a060b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1275,11 +1275,10 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> createContinuousQuery(
- CreateContinuousQueryStatement createContinuousQueryStatement) {
+ CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
createContinuousQueryStatement.semanticCheck();
String queryBody = createContinuousQueryStatement.getQueryBody();
- String sql = createContinuousQueryStatement.getSql();
// TODO Do not modify Statement in Analyzer
Analyzer.validate(createContinuousQueryStatement.getQueryBodyStatement());
@@ -1296,7 +1295,8 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
createContinuousQueryStatement.getTimeoutPolicy().getType(),
queryBody,
sql,
- createContinuousQueryStatement.getZoneId());
+ createContinuousQueryStatement.getZoneId(),
+ username);
final TSStatus executionStatus = client.createCQ(tCreateCQReq);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 9c2f5e07d1..9ad0a4aca2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -148,7 +148,7 @@ public interface IConfigTaskExecutor {
GetTimeSlotListStatement getTimeSlotListStatement);
SettableFuture<ConfigTaskResult> createContinuousQuery(
- CreateContinuousQueryStatement createContinuousQueryStatement);
+ CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username);
SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index bc1aad48bf..726e368558 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -646,7 +646,7 @@ public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> createContinuousQuery(
- CreateContinuousQueryStatement createContinuousQueryStatement) {
+ CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try {
// todo: implementation
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
index 060162e731..c1d9859cc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
@@ -30,13 +30,20 @@ public class CreateContinuousQueryTask implements IConfigTask {
private final CreateContinuousQueryStatement createContinuousQueryStatement;
- public CreateContinuousQueryTask(CreateContinuousQueryStatement createContinuousQueryStatement) {
+ private final String sql;
+
+ private final String username;
+
+ public CreateContinuousQueryTask(
+ CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
this.createContinuousQueryStatement = createContinuousQueryStatement;
+ this.sql = sql;
+ this.username = username;
}
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.createContinuousQuery(createContinuousQueryStatement);
+ return configTaskExecutor.createContinuousQuery(createContinuousQueryStatement, sql, username);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 77ed3398ff..7f7bbf47d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -84,10 +84,11 @@ import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.GreaterEqualExpression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.LessThanExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
-import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -714,7 +715,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
@Override
- public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) throws TException {
+ public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) {
try {
PipeInfo pipeInfo = PipeInfo.deserializePipeInfo(req.pipeInfo);
SyncService.getInstance().addPipe(pipeInfo);
@@ -725,7 +726,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
@Override
- public TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req) throws TException {
+ public TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req) {
try {
switch (SyncOperation.values()[req.getOperation()]) {
case START_PIPE:
@@ -748,10 +749,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
@Override
- public TSStatus executeCQ(TExecuteCQ req) throws TException {
+ public TSStatus executeCQ(TExecuteCQ req) {
long sessionId =
- SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
+ SESSION_MANAGER.requestSessionId(
+ req.username, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
String executedSQL = req.queryBody;
try {
@@ -766,10 +768,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
// 1. add time filter in where
Expression timeFilter =
- new BetweenExpression(
- new TimestampOperand(),
- new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
- new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
+ new LogicAndExpression(
+ new GreaterEqualExpression(
+ new TimestampOperand(),
+ new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime))),
+ new LessThanExpression(
+ new TimestampOperand(),
+ new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime))));
if (s.getWhereCondition() != null) {
s.getWhereCondition()
.setPredicate(new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
@@ -781,6 +786,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
if (s.getGroupByTimeComponent() != null) {
s.getGroupByTimeComponent().setStartTime(req.startTime);
s.getGroupByTimeComponent().setEndTime(req.endTime);
+ s.getGroupByTimeComponent().setLeftCRightO(true);
}
executedSQL = String.join(" ", s.constructFormattedSQL().split("\n")).replaceAll(" +", " ");
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 40d1043bd0..8f36f49994 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -568,6 +568,7 @@ struct TCreateCQReq {
7: required string queryBody
8: required string sql
9: required string zoneId
+ 10: required string username
}
struct TDropCQReq {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 7b8723ff32..6e33ef80b4 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -331,6 +331,7 @@ struct TExecuteCQ {
4: required i64 timeout
5: required string zoneId
6: required string cqId
+ 7: required string username
}
service IDataNodeRPCService {