You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/17 08:45:57 UTC

[iotdb] branch IOTDB-4619 updated: Add more logs in CQScheduleTask

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
     new ebb6ce53ee Add more logs in CQScheduleTask
ebb6ce53ee is described below

commit ebb6ce53ee51511c7aa5a49e8159bc45eff6157b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Oct 17 16:45:41 2022 +0800

    Add more logs in CQScheduleTask
---
 .../confignode/manager/cq/CQScheduleTask.java      | 27 +++++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index 5c4d777157..e0c734fc2a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -148,12 +148,18 @@ public class CQScheduleTask implements Runnable {
         submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
       }
     } else {
+      LOGGER.info(
+          "[StartExecuteCQ] {}, time range is [{}, {}), current time is {}",
+          cqId,
+          startTime,
+          endTime,
+          System.currentTimeMillis());
       TExecuteCQ executeCQReq =
           new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId);
       try {
         AsyncDataNodeInternalServiceClient client =
             AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
-        client.executeCQ(executeCQReq, new AsyncExecuteCQCallback());
+        client.executeCQ(executeCQReq, new AsyncExecuteCQCallback(startTime, endTime));
       } catch (Throwable t) {
         LOGGER.warn("Execute CQ {} failed", cqId, t);
         if (needSubmit()) {
@@ -190,10 +196,25 @@ public class CQScheduleTask implements Runnable {
 
   private class AsyncExecuteCQCallback implements AsyncMethodCallback<TSStatus> {
 
+    private final long startTime;
+    private final long endTime;
+
+    public AsyncExecuteCQCallback(long startTime, long endTime) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
     @Override
     public void onComplete(TSStatus response) {
       if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
 
+        LOGGER.info(
+            "[EndExecuteCQ] {}, time range is [{}, {}), current time is {}",
+            cqId,
+            startTime,
+            endTime,
+            System.currentTimeMillis());
+
         ConsensusWriteResponse result =
             configManager
                 .getConsensusManager()
@@ -203,6 +224,10 @@ public class CQScheduleTask implements Runnable {
           if (needSubmit()) {
             updateExecutionTime();
             submitSelf();
+          } else {
+            LOGGER.info(
+                "Stop submitting CQ {} because current node is not leader or current scheduled thread pool is shut down.",
+                cqId);
           }
         } else {
           LOGGER.warn(