You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/13 10:09:52 UTC
[iotdb] branch IOTDB-4619 updated: Finish recovery phase
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
new 9138c5babd Finish recovery phase
9138c5babd is described below
commit 9138c5babdc5baf87a92dd50484c304ac99881eb
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 13 18:06:31 2022 +0800
Finish recovery phase
---
.../confignode/consensus/response/ShowCQResp.java | 16 ++++-
.../statemachine/PartitionRegionStateMachine.java | 2 +
.../iotdb/confignode/manager/cq/CQManager.java | 61 ++++++++++++++++++-
.../confignode/manager/cq/CQScheduleTask.java | 54 ++++++++++++++---
.../iotdb/confignode/persistence/cq/CQInfo.java | 68 ++++++++++++++++++++--
5 files changed, 184 insertions(+), 17 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java
index 36d9ced1d9..44a8de0080 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ShowCQResp.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.consensus.common.DataSet;
@@ -26,18 +27,27 @@ import org.apache.iotdb.consensus.common.DataSet;
import javax.validation.constraints.NotNull;
import java.util.List;
+import java.util.stream.Collectors;
public class ShowCQResp implements DataSet {
private final TSStatus status;
- private final List<TCQEntry> cqList;
+ private final List<CQInfo.CQEntry> cqList;
- public ShowCQResp(@NotNull TSStatus status, @NotNull List<TCQEntry> cqList) {
+ public ShowCQResp(@NotNull TSStatus status, @NotNull List<CQInfo.CQEntry> cqList) {
this.status = status;
this.cqList = cqList;
}
public TShowCQResp convertToRpcShowCQResp() {
- return new TShowCQResp(status, cqList);
+ return new TShowCQResp(
+ status,
+ cqList.stream()
+ .map(entry -> new TCQEntry(entry.getCqId(), entry.getSql(), entry.getState().getType()))
+ .collect(Collectors.toList()));
+ }
+
+ public List<CQInfo.CQEntry> getCqList() {
+ return cqList;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index b45c933671..be4936ad5e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -152,6 +152,7 @@ public class PartitionRegionStateMachine
configManager.getLoadManager().startLoadBalancingService();
configManager.getNodeManager().startHeartbeatService();
configManager.getPartitionManager().startRegionCleaner();
+ configManager.getCQManager().startCQScheduler();
} else {
LOGGER.info(
"Current node {} is not longer the leader, the new leader is {}", currentNode, newLeader);
@@ -159,6 +160,7 @@ public class PartitionRegionStateMachine
configManager.getLoadManager().stopLoadBalancingService();
configManager.getNodeManager().stopHeartbeatService();
configManager.getPartitionManager().stopRegionCleaner();
+ configManager.getCQManager().stopCQScheduler();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 9914369a1c..b0995f4496 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -20,12 +20,14 @@ package org.apache.iotdb.confignode.manager.cq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.cq.CQState;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.response.ShowCQResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
@@ -37,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -105,8 +108,64 @@ public class CQManager {
res = executor;
} finally {
lock.readLock().unlock();
- ;
}
return res;
}
+
+ public void startCQScheduler() {
+ lock.writeLock().lock();
+ try {
+ // 1. shutdown previous cq schedule thread pool
+ try {
+ executor.shutdown();
+ } catch (Throwable t) {
+ // just print the error log because we should make sure we can start a new cq schedule pool
+ // successfully in the next steps
+ LOGGER.error("Error happened while shutting down previous cq schedule thread pool.", t);
+ }
+
+ // 2. start a new schedule thread pool
+ executor =
+ IoTDBThreadPoolFactory.newScheduledThreadPool(CONF.getCqSubmitThread(), "CQ-Scheduler");
+
+ // 3. get all CQs
+ List<CQInfo.CQEntry> allCQs = null;
+ // keep fetching until we get all CQEntries if this node is still leader
+ while (allCQs == null && configManager.getConsensusManager().isLeader()) {
+ ConsensusReadResponse response = configManager.getConsensusManager().read(new ShowCQPlan());
+ if (response.getDataset() != null) {
+ allCQs = ((ShowCQResp) response.getDataset()).getCqList();
+ } else {
+ // consensus layer related errors
+ LOGGER.warn(
+ "Unexpected error happened while fetching cq list: ", response.getException());
+ }
+ }
+
+ // 4. recover the scheduling of active CQs
+ if (allCQs != null) {
+ for (CQInfo.CQEntry entry : allCQs) {
+ if (entry.getState() == CQState.ACTIVE) {
+ CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager);
+ cqScheduleTask.submitSelf();
+ }
+ }
+ }
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void stopCQScheduler() {
+ ScheduledExecutorService previous;
+ lock.writeLock().lock();
+ try {
+ previous = executor;
+ executor = null;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ previous.shutdown();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index fffbffc110..22765c622e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.cq.TimeoutPolicy;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
@@ -65,17 +66,56 @@ public class CQScheduleTask implements Runnable {
String md5,
ScheduledExecutorService executor,
ConfigManager configManager) {
- this.cqId = req.cqId;
- this.everyInterval = req.everyInterval;
- this.startTimeOffset = req.startTimeOffset;
- this.endTimeOffset = req.endTimeOffset;
- this.timeoutPolicy = TimeoutPolicy.deserialize(req.timeoutPolicy);
- this.queryBody = req.queryBody;
+ this(
+ req.cqId,
+ req.everyInterval,
+ req.startTimeOffset,
+ req.endTimeOffset,
+ TimeoutPolicy.deserialize(req.timeoutPolicy),
+ req.queryBody,
+ md5,
+ executor,
+ configManager,
+ firstExecutionTime);
+ }
+
+ public CQScheduleTask(
+ CQInfo.CQEntry entry, ScheduledExecutorService executor, ConfigManager configManager) {
+ this(
+ entry.getCqId(),
+ entry.getEveryInterval(),
+ entry.getStartTimeOffset(),
+ entry.getEndTimeOffset(),
+ entry.getTimeoutPolicy(),
+ entry.getQueryBody(),
+ entry.getMd5(),
+ executor,
+ configManager,
+ entry.getLastExecutionTime() + entry.getEveryInterval());
+ }
+
+ public CQScheduleTask(
+ String cqId,
+ long everyInterval,
+ long startTimeOffset,
+ long endTimeOffset,
+ TimeoutPolicy timeoutPolicy,
+ String queryBody,
+ String md5,
+ ScheduledExecutorService executor,
+ ConfigManager configManager,
+ long executionTime) {
+ this.cqId = cqId;
+ this.everyInterval = everyInterval;
+ this.startTimeOffset = startTimeOffset;
+ this.endTimeOffset = endTimeOffset;
+ this.timeoutPolicy = timeoutPolicy;
+ this.queryBody = queryBody;
this.md5 = md5;
this.executor = executor;
this.configManager = configManager;
this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval);
- this.executionTime = firstExecutionTime;
+ this.executionTime = executionTime;
}
public static long getFirstExecutionTime(long boundaryTime, long everyInterval) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index e8cc2695d6..d9e5937ee9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
import org.apache.iotdb.confignode.consensus.response.ShowCQResp;
-import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -132,9 +131,7 @@ public class CQInfo implements SnapshotProcessor {
try {
return new ShowCQResp(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
- cqMap.values().stream()
- .map(entry -> new TCQEntry(entry.cqId, entry.sql, entry.state.getType()))
- .collect(Collectors.toList()));
+ cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()));
} finally {
lock.readLock().unlock();
}
@@ -267,7 +264,7 @@ public class CQInfo implements SnapshotProcessor {
cqMap.clear();
}
- private static class CQEntry {
+ public static class CQEntry {
private final String cqId;
private final long everyInterval;
private final long boundaryTime;
@@ -285,7 +282,7 @@ public class CQInfo implements SnapshotProcessor {
this(
req.cqId,
req.everyInterval,
- req.everyInterval,
+ req.boundaryTime,
req.startTimeOffset,
req.endTimeOffset,
TimeoutPolicy.deserialize(req.timeoutPolicy),
@@ -296,6 +293,21 @@ public class CQInfo implements SnapshotProcessor {
lastExecutionTime);
}
+ private CQEntry(CQEntry other) {
+ this(
+ other.cqId,
+ other.everyInterval,
+ other.boundaryTime,
+ other.startTimeOffset,
+ other.endTimeOffset,
+ other.timeoutPolicy,
+ other.queryBody,
+ other.sql,
+ other.md5,
+ other.state,
+ other.lastExecutionTime);
+ }
+
private CQEntry(
String cqId,
long everyInterval,
@@ -360,5 +372,49 @@ public class CQInfo implements SnapshotProcessor {
state,
lastExecutionTime);
}
+
+ public String getCqId() {
+ return cqId;
+ }
+
+ public long getEveryInterval() {
+ return everyInterval;
+ }
+
+ public long getBoundaryTime() {
+ return boundaryTime;
+ }
+
+ public long getStartTimeOffset() {
+ return startTimeOffset;
+ }
+
+ public long getEndTimeOffset() {
+ return endTimeOffset;
+ }
+
+ public TimeoutPolicy getTimeoutPolicy() {
+ return timeoutPolicy;
+ }
+
+ public String getQueryBody() {
+ return queryBody;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public String getMd5() {
+ return md5;
+ }
+
+ public CQState getState() {
+ return state;
+ }
+
+ public long getLastExecutionTime() {
+ return lastExecutionTime;
+ }
}
}