You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/22 09:55:21 UTC
[iotdb] branch master updated: [IOTDB-5516] Try sync delete schema region when dropping database (#9108)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bb7973f564 [IOTDB-5516] Try sync delete schema region when dropping database (#9108)
bb7973f564 is described below
commit bb7973f564331f9416708796259a09cd6ccb7df0
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Feb 22 17:55:14 2023 +0800
[IOTDB-5516] Try sync delete schema region when dropping database (#9108)
---
.../client/async/AsyncDataNodeClientPool.java | 7 ++
.../impl/schema/DeleteStorageGroupProcedure.java | 83 ++++++++++++++++++----
2 files changed, 78 insertions(+), 12 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 097753ccbd..6b02149ec3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.client.async;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -146,6 +147,12 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case DELETE_REGION:
+ client.deleteRegion(
+ (TConsensusGroupId) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
case CREATE_SCHEMA_REGION:
client.createSchemaRegion(
(TCreateSchemaRegionReq) clientHandler.getRequest(requestId),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
index 89ecff9aa2..c00c90d816 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
@@ -19,10 +19,16 @@
package org.apache.iotdb.confignode.procedure.impl.schema;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -43,7 +49,10 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class DeleteStorageGroupProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, DeleteStorageGroupState> {
@@ -98,19 +107,13 @@ public class DeleteStorageGroupProcedure
LOG.info("Delete config info of {}", deleteSgSchema.getName());
// Submit RegionDeleteTasks
- OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan();
+ OfferRegionMaintainTasksPlan dataRegionDeleteTaskOfferPlan =
+ new OfferRegionMaintainTasksPlan();
List<TRegionReplicaSet> regionReplicaSets =
env.getAllReplicaSets(deleteSgSchema.getName());
+ List<TRegionReplicaSet> schemaRegionReplicaSets = new ArrayList<>();
regionReplicaSets.forEach(
regionReplicaSet -> {
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- targetDataNode ->
- offerPlan.appendRegionMaintainTask(
- new RegionDeleteTask(
- targetDataNode, regionReplicaSet.getRegionId())));
-
// Clear heartbeat cache along the way
env.getConfigManager()
.getPartitionManager()
@@ -120,13 +123,69 @@ public class DeleteStorageGroupProcedure
.getRouteBalancer()
.getRegionRouteMap()
.removeRegionRouteCache(regionReplicaSet.getRegionId());
+
+ if (regionReplicaSet
+ .getRegionId()
+ .getType()
+ .equals(TConsensusGroupType.SchemaRegion)) {
+ schemaRegionReplicaSets.add(regionReplicaSet);
+ } else {
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ targetDataNode ->
+ dataRegionDeleteTaskOfferPlan.appendRegionMaintainTask(
+ new RegionDeleteTask(
+ targetDataNode, regionReplicaSet.getRegionId())));
+ }
});
- env.getConfigManager().getConsensusManager().write(offerPlan);
+
+ if (!dataRegionDeleteTaskOfferPlan.getRegionMaintainTaskList().isEmpty()) {
+ // submit async data region delete task
+ env.getConfigManager().getConsensusManager().write(dataRegionDeleteTaskOfferPlan);
+ }
// Delete StorageGroupPartitionTable
- TSStatus status = env.deleteConfig(deleteSgSchema.getName());
+ TSStatus deleteConfigResult = env.deleteConfig(deleteSgSchema.getName());
+
+ // try sync delete schema region
+ AsyncClientHandler<TConsensusGroupId, TSStatus> asyncClientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.DELETE_REGION);
+ Map<Integer, RegionDeleteTask> schemaRegionDeleteTaskMap = new HashMap<>();
+ int requestIndex = 0;
+ for (TRegionReplicaSet schemaRegionReplicaSet : schemaRegionReplicaSets) {
+ asyncClientHandler.putRequest(requestIndex, schemaRegionReplicaSet.getRegionId());
+ for (TDataNodeLocation dataNodeLocation :
+ schemaRegionReplicaSet.getDataNodeLocations()) {
+ asyncClientHandler.putDataNodeLocation(requestIndex, dataNodeLocation);
+ schemaRegionDeleteTaskMap.put(
+ requestIndex,
+ new RegionDeleteTask(dataNodeLocation, schemaRegionReplicaSet.getRegionId()));
+ }
+ requestIndex++;
+ }
+ if (!schemaRegionDeleteTaskMap.isEmpty()) {
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
+ for (Map.Entry<Integer, TSStatus> entry :
+ asyncClientHandler.getResponseMap().entrySet()) {
+ if (entry.getValue().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ schemaRegionDeleteTaskMap.remove(entry.getKey());
+ }
+ }
+
+ if (!schemaRegionDeleteTaskMap.isEmpty()) {
+ // submit async schema region delete task for failed sync execution
+ OfferRegionMaintainTasksPlan schemaRegionDeleteTaskOfferPlan =
+ new OfferRegionMaintainTasksPlan();
+ schemaRegionDeleteTaskMap
+ .values()
+ .forEach(schemaRegionDeleteTaskOfferPlan::appendRegionMaintainTask);
+ env.getConfigManager().getConsensusManager().write(schemaRegionDeleteTaskOfferPlan);
+ }
+ }
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (deleteConfigResult.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return Flow.NO_MORE_STATE;
} else if (getCycles() > RETRY_THRESHOLD) {
setFailure(new ProcedureException("Delete config info id failed"));