You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/17 08:45:57 UTC
[iotdb] branch IOTDB-4619 updated: Add more logs in CQScheduleTask
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
new ebb6ce53ee Add more logs in CQScheduleTask
ebb6ce53ee is described below
commit ebb6ce53ee51511c7aa5a49e8159bc45eff6157b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Oct 17 16:45:41 2022 +0800
Add more logs in CQScheduleTask
---
.../confignode/manager/cq/CQScheduleTask.java | 27 +++++++++++++++++++++-
1 file changed, 26 insertions(+), 1 deletion(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index 5c4d777157..e0c734fc2a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -148,12 +148,18 @@ public class CQScheduleTask implements Runnable {
submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
}
} else {
+ LOGGER.info(
+ "[StartExecuteCQ] {}, time range is [{}, {}), current time is {}",
+ cqId,
+ startTime,
+ endTime,
+ System.currentTimeMillis());
TExecuteCQ executeCQReq =
new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId);
try {
AsyncDataNodeInternalServiceClient client =
AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
- client.executeCQ(executeCQReq, new AsyncExecuteCQCallback());
+ client.executeCQ(executeCQReq, new AsyncExecuteCQCallback(startTime, endTime));
} catch (Throwable t) {
LOGGER.warn("Execute CQ {} failed", cqId, t);
if (needSubmit()) {
@@ -190,10 +196,25 @@ public class CQScheduleTask implements Runnable {
private class AsyncExecuteCQCallback implements AsyncMethodCallback<TSStatus> {
+ private final long startTime;
+ private final long endTime;
+
+ public AsyncExecuteCQCallback(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
@Override
public void onComplete(TSStatus response) {
if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.info(
+ "[EndExecuteCQ] {}, time range is [{}, {}), current time is {}",
+ cqId,
+ startTime,
+ endTime,
+ System.currentTimeMillis());
+
ConsensusWriteResponse result =
configManager
.getConsensusManager()
@@ -203,6 +224,10 @@ public class CQScheduleTask implements Runnable {
if (needSubmit()) {
updateExecutionTime();
submitSelf();
+ } else {
+ LOGGER.info(
+ "Stop submitting CQ {} because current node is not leader or current scheduled thread pool is shut down.",
+ cqId);
}
} else {
LOGGER.warn(