You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/22 09:55:21 UTC

[iotdb] branch master updated: [IOTDB-5516] Try sync delete schema region when dropping database (#9108)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bb7973f564 [IOTDB-5516] Try sync delete schema region when dropping database (#9108)
bb7973f564 is described below

commit bb7973f564331f9416708796259a09cd6ccb7df0
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Feb 22 17:55:14 2023 +0800

    [IOTDB-5516] Try sync delete schema region when dropping database (#9108)
---
 .../client/async/AsyncDataNodeClientPool.java      |  7 ++
 .../impl/schema/DeleteStorageGroupProcedure.java   | 83 ++++++++++++++++++----
 2 files changed, 78 insertions(+), 12 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 097753ccbd..6b02149ec3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.client.async;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -146,6 +147,12 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case DELETE_REGION:
+          client.deleteRegion(
+              (TConsensusGroupId) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         case CREATE_SCHEMA_REGION:
           client.createSchemaRegion(
               (TCreateSchemaRegionReq) clientHandler.getRequest(requestId),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
index 89ecff9aa2..c00c90d816 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
@@ -19,10 +19,16 @@
 
 package org.apache.iotdb.confignode.procedure.impl.schema;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -43,7 +49,10 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class DeleteStorageGroupProcedure
     extends StateMachineProcedure<ConfigNodeProcedureEnv, DeleteStorageGroupState> {
@@ -98,19 +107,13 @@ public class DeleteStorageGroupProcedure
           LOG.info("Delete config info of {}", deleteSgSchema.getName());
 
           // Submit RegionDeleteTasks
-          OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan();
+          OfferRegionMaintainTasksPlan dataRegionDeleteTaskOfferPlan =
+              new OfferRegionMaintainTasksPlan();
           List<TRegionReplicaSet> regionReplicaSets =
               env.getAllReplicaSets(deleteSgSchema.getName());
+          List<TRegionReplicaSet> schemaRegionReplicaSets = new ArrayList<>();
           regionReplicaSets.forEach(
               regionReplicaSet -> {
-                regionReplicaSet
-                    .getDataNodeLocations()
-                    .forEach(
-                        targetDataNode ->
-                            offerPlan.appendRegionMaintainTask(
-                                new RegionDeleteTask(
-                                    targetDataNode, regionReplicaSet.getRegionId())));
-
                 // Clear heartbeat cache along the way
                 env.getConfigManager()
                     .getPartitionManager()
@@ -120,13 +123,69 @@ public class DeleteStorageGroupProcedure
                     .getRouteBalancer()
                     .getRegionRouteMap()
                     .removeRegionRouteCache(regionReplicaSet.getRegionId());
+
+                if (regionReplicaSet
+                    .getRegionId()
+                    .getType()
+                    .equals(TConsensusGroupType.SchemaRegion)) {
+                  schemaRegionReplicaSets.add(regionReplicaSet);
+                } else {
+                  regionReplicaSet
+                      .getDataNodeLocations()
+                      .forEach(
+                          targetDataNode ->
+                              dataRegionDeleteTaskOfferPlan.appendRegionMaintainTask(
+                                  new RegionDeleteTask(
+                                      targetDataNode, regionReplicaSet.getRegionId())));
+                }
               });
-          env.getConfigManager().getConsensusManager().write(offerPlan);
+
+          if (!dataRegionDeleteTaskOfferPlan.getRegionMaintainTaskList().isEmpty()) {
+            // submit async data region delete task
+            env.getConfigManager().getConsensusManager().write(dataRegionDeleteTaskOfferPlan);
+          }
 
           // Delete StorageGroupPartitionTable
-          TSStatus status = env.deleteConfig(deleteSgSchema.getName());
+          TSStatus deleteConfigResult = env.deleteConfig(deleteSgSchema.getName());
+
+          // try sync delete schema region
+          AsyncClientHandler<TConsensusGroupId, TSStatus> asyncClientHandler =
+              new AsyncClientHandler<>(DataNodeRequestType.DELETE_REGION);
+          Map<Integer, RegionDeleteTask> schemaRegionDeleteTaskMap = new HashMap<>();
+          int requestIndex = 0;
+          for (TRegionReplicaSet schemaRegionReplicaSet : schemaRegionReplicaSets) {
+            asyncClientHandler.putRequest(requestIndex, schemaRegionReplicaSet.getRegionId());
+            for (TDataNodeLocation dataNodeLocation :
+                schemaRegionReplicaSet.getDataNodeLocations()) {
+              asyncClientHandler.putDataNodeLocation(requestIndex, dataNodeLocation);
+              schemaRegionDeleteTaskMap.put(
+                  requestIndex,
+                  new RegionDeleteTask(dataNodeLocation, schemaRegionReplicaSet.getRegionId()));
+            }
+            requestIndex++;
+          }
+          if (!schemaRegionDeleteTaskMap.isEmpty()) {
+            AsyncDataNodeClientPool.getInstance()
+                .sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
+            for (Map.Entry<Integer, TSStatus> entry :
+                asyncClientHandler.getResponseMap().entrySet()) {
+              if (entry.getValue().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+                schemaRegionDeleteTaskMap.remove(entry.getKey());
+              }
+            }
+
+            if (!schemaRegionDeleteTaskMap.isEmpty()) {
+              // submit async schema region delete task for failed sync execution
+              OfferRegionMaintainTasksPlan schemaRegionDeleteTaskOfferPlan =
+                  new OfferRegionMaintainTasksPlan();
+              schemaRegionDeleteTaskMap
+                  .values()
+                  .forEach(schemaRegionDeleteTaskOfferPlan::appendRegionMaintainTask);
+              env.getConfigManager().getConsensusManager().write(schemaRegionDeleteTaskOfferPlan);
+            }
+          }
 
-          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          if (deleteConfigResult.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             return Flow.NO_MORE_STATE;
           } else if (getCycles() > RETRY_THRESHOLD) {
             setFailure(new ProcedureException("Delete config info id failed"));