You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/13 10:09:52 UTC

[iotdb] branch IOTDB-4619 updated: Finish recovery phase

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
     new 9138c5babd Finish recovery phase
9138c5babd is described below

commit 9138c5babdc5baf87a92dd50484c304ac99881eb
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 13 18:06:31 2022 +0800

    Finish recovery phase
---
 .../confignode/consensus/response/ShowCQResp.java  | 16 ++++-
 .../statemachine/PartitionRegionStateMachine.java  |  2 +
 .../iotdb/confignode/manager/cq/CQManager.java     | 61 ++++++++++++++++++-
 .../confignode/manager/cq/CQScheduleTask.java      | 54 ++++++++++++++---
 .../iotdb/confignode/persistence/cq/CQInfo.java    | 68 ++++++++++++++++++++--
 5 files changed, 184 insertions(+), 17 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java
index 36d9ced1d9..44a8de0080 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.confignode.consensus.response;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -26,18 +27,27 @@ import org.apache.iotdb.consensus.common.DataSet;
 import javax.validation.constraints.NotNull;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class ShowCQResp implements DataSet {
 
   private final TSStatus status;
-  private final List<TCQEntry> cqList;
+  private final List<CQInfo.CQEntry> cqList;
 
-  public ShowCQResp(@NotNull TSStatus status, @NotNull List<TCQEntry> cqList) {
+  public ShowCQResp(@NotNull TSStatus status, @NotNull List<CQInfo.CQEntry> cqList) {
     this.status = status;
     this.cqList = cqList;
   }
 
   public TShowCQResp convertToRpcShowCQResp() {
-    return new TShowCQResp(status, cqList);
+    return new TShowCQResp(
+        status,
+        cqList.stream()
+            .map(entry -> new TCQEntry(entry.getCqId(), entry.getSql(), entry.getState().getType()))
+            .collect(Collectors.toList()));
+  }
+
+  public List<CQInfo.CQEntry> getCqList() {
+    return cqList;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index b45c933671..be4936ad5e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -152,6 +152,7 @@ public class PartitionRegionStateMachine
       configManager.getLoadManager().startLoadBalancingService();
       configManager.getNodeManager().startHeartbeatService();
       configManager.getPartitionManager().startRegionCleaner();
+      configManager.getCQManager().startCQScheduler();
     } else {
       LOGGER.info(
           "Current node {} is not longer the leader, the new leader is {}", currentNode, newLeader);
@@ -159,6 +160,7 @@ public class PartitionRegionStateMachine
       configManager.getLoadManager().stopLoadBalancingService();
       configManager.getNodeManager().stopHeartbeatService();
       configManager.getPartitionManager().stopRegionCleaner();
+      configManager.getCQManager().stopCQScheduler();
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 9914369a1c..b0995f4496 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -20,12 +20,14 @@ package org.apache.iotdb.confignode.manager.cq;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.cq.CQState;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.response.ShowCQResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
@@ -37,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -105,8 +108,64 @@ public class CQManager {
       res = executor;
     } finally {
       lock.readLock().unlock();
-      ;
     }
     return res;
   }
+
+  public void startCQScheduler() {
+    lock.writeLock().lock();
+    try {
+      // 1. shutdown previous cq schedule thread pool
+      try {
+        executor.shutdown();
+      } catch (Throwable t) {
+        // just print the error log because we should make sure we can start a new cq schedule pool
+        // successfully in the next steps
+        LOGGER.error("Error happened while shutting down previous cq schedule thread pool.", t);
+      }
+
+      // 2. start a new schedule thread pool
+      executor =
+          IoTDBThreadPoolFactory.newScheduledThreadPool(CONF.getCqSubmitThread(), "CQ-Scheduler");
+
+      // 3. get all CQs
+      List<CQInfo.CQEntry> allCQs = null;
+      // keep fetching until we get all CQEntries if this node is still leader
+      while (allCQs == null && configManager.getConsensusManager().isLeader()) {
+        ConsensusReadResponse response = configManager.getConsensusManager().read(new ShowCQPlan());
+        if (response.getDataset() != null) {
+          allCQs = ((ShowCQResp) response.getDataset()).getCqList();
+        } else {
+          // consensus layer related errors
+          LOGGER.warn(
+              "Unexpected error happened while fetching cq list: ", response.getException());
+        }
+      }
+
+      // 4. recover the scheduling of active CQs
+      if (allCQs != null) {
+        for (CQInfo.CQEntry entry : allCQs) {
+          if (entry.getState() == CQState.ACTIVE) {
+            CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager);
+            cqScheduleTask.submitSelf();
+          }
+        }
+      }
+
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  public void stopCQScheduler() {
+    ScheduledExecutorService previous;
+    lock.writeLock().lock();
+    try {
+      previous = executor;
+      executor = null;
+    } finally {
+      lock.writeLock().unlock();
+    }
+    previous.shutdown();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index fffbffc110..22765c622e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.cq.TimeoutPolicy;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
@@ -65,17 +66,56 @@ public class CQScheduleTask implements Runnable {
       String md5,
       ScheduledExecutorService executor,
       ConfigManager configManager) {
-    this.cqId = req.cqId;
-    this.everyInterval = req.everyInterval;
-    this.startTimeOffset = req.startTimeOffset;
-    this.endTimeOffset = req.endTimeOffset;
-    this.timeoutPolicy = TimeoutPolicy.deserialize(req.timeoutPolicy);
-    this.queryBody = req.queryBody;
+    this(
+        req.cqId,
+        req.everyInterval,
+        req.startTimeOffset,
+        req.endTimeOffset,
+        TimeoutPolicy.deserialize(req.timeoutPolicy),
+        req.queryBody,
+        md5,
+        executor,
+        configManager,
+        firstExecutionTime);
+  }
+
+  public CQScheduleTask(
+      CQInfo.CQEntry entry, ScheduledExecutorService executor, ConfigManager configManager) {
+    this(
+        entry.getCqId(),
+        entry.getEveryInterval(),
+        entry.getStartTimeOffset(),
+        entry.getEndTimeOffset(),
+        entry.getTimeoutPolicy(),
+        entry.getQueryBody(),
+        entry.getMd5(),
+        executor,
+        configManager,
+        entry.getLastExecutionTime() + entry.getEveryInterval());
+  }
+
+  public CQScheduleTask(
+      String cqId,
+      long everyInterval,
+      long startTimeOffset,
+      long endTimeOffset,
+      TimeoutPolicy timeoutPolicy,
+      String queryBody,
+      String md5,
+      ScheduledExecutorService executor,
+      ConfigManager configManager,
+      long executionTime) {
+    this.cqId = cqId;
+    this.everyInterval = everyInterval;
+    this.startTimeOffset = startTimeOffset;
+    this.endTimeOffset = endTimeOffset;
+    this.timeoutPolicy = timeoutPolicy;
+    this.queryBody = queryBody;
     this.md5 = md5;
     this.executor = executor;
     this.configManager = configManager;
     this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval);
-    this.executionTime = firstExecutionTime;
+    this.executionTime = executionTime;
   }
 
   public static long getFirstExecutionTime(long boundaryTime, long everyInterval) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index e8cc2695d6..d9e5937ee9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
 import org.apache.iotdb.confignode.consensus.response.ShowCQResp;
-import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -132,9 +131,7 @@ public class CQInfo implements SnapshotProcessor {
     try {
       return new ShowCQResp(
           new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-          cqMap.values().stream()
-              .map(entry -> new TCQEntry(entry.cqId, entry.sql, entry.state.getType()))
-              .collect(Collectors.toList()));
+          cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()));
     } finally {
       lock.readLock().unlock();
     }
@@ -267,7 +264,7 @@ public class CQInfo implements SnapshotProcessor {
     cqMap.clear();
   }
 
-  private static class CQEntry {
+  public static class CQEntry {
     private final String cqId;
     private final long everyInterval;
     private final long boundaryTime;
@@ -285,7 +282,7 @@ public class CQInfo implements SnapshotProcessor {
       this(
           req.cqId,
           req.everyInterval,
-          req.everyInterval,
+          req.boundaryTime,
           req.startTimeOffset,
           req.endTimeOffset,
           TimeoutPolicy.deserialize(req.timeoutPolicy),
@@ -296,6 +293,21 @@ public class CQInfo implements SnapshotProcessor {
           lastExecutionTime);
     }
 
+    private CQEntry(CQEntry other) {
+      this(
+          other.cqId,
+          other.everyInterval,
+          other.boundaryTime,
+          other.startTimeOffset,
+          other.endTimeOffset,
+          other.timeoutPolicy,
+          other.queryBody,
+          other.sql,
+          other.md5,
+          other.state,
+          other.lastExecutionTime);
+    }
+
     private CQEntry(
         String cqId,
         long everyInterval,
@@ -360,5 +372,49 @@ public class CQInfo implements SnapshotProcessor {
           state,
           lastExecutionTime);
     }
+
+    public String getCqId() {
+      return cqId;
+    }
+
+    public long getEveryInterval() {
+      return everyInterval;
+    }
+
+    public long getBoundaryTime() {
+      return boundaryTime;
+    }
+
+    public long getStartTimeOffset() {
+      return startTimeOffset;
+    }
+
+    public long getEndTimeOffset() {
+      return endTimeOffset;
+    }
+
+    public TimeoutPolicy getTimeoutPolicy() {
+      return timeoutPolicy;
+    }
+
+    public String getQueryBody() {
+      return queryBody;
+    }
+
+    public String getSql() {
+      return sql;
+    }
+
+    public String getMd5() {
+      return md5;
+    }
+
+    public CQState getState() {
+      return state;
+    }
+
+    public long getLastExecutionTime() {
+      return lastExecutionTime;
+    }
   }
 }