You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/05/24 09:18:11 UTC
[iotdb] branch master updated: [IOTDB-2919] Refactor delete storage group (#5994)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c0362c2f4e [IOTDB-2919] Refactor delete storage group (#5994)
c0362c2f4e is described below
commit c0362c2f4e6519c2b5c26aa4010a26355232a3f6
Author: cmlmakahts <82...@users.noreply.github.com>
AuthorDate: Tue May 24 17:18:03 2022 +0800
[IOTDB-2919] Refactor delete storage group (#5994)
[IOTDB-2919] Refactor delete storage group (#5994)
---
.../confignode/client/SyncDataNodeClientPool.java | 139 +++++++++++++++++
.../consensus/request/ConfigRequest.java | 4 +
.../consensus/request/ConfigRequestType.java | 1 +
.../request/write/PreDeleteStorageGroupReq.java | 86 +++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 1 +
.../iotdb/confignode/manager/PartitionManager.java | 37 +++++
.../iotdb/confignode/manager/ProcedureManager.java | 2 +-
.../confignode/persistence/PartitionInfo.java | 58 ++++++-
.../executor/ConfigRequestExecutor.java | 3 +
.../procedure/env/ConfigNodeProcedureEnv.java | 112 +++++++++-----
.../impl/DeleteStorageGroupProcedure.java | 169 ++++++---------------
.../procedure/state/DeleteStorageGroupState.java | 7 +-
.../confignode/persistence/PartitionInfoTest.java | 5 +
.../confignode/procedure/TestProcedureBase.java | 1 +
.../procedure/TestProcedureExecutor.java | 1 +
.../confignode/procedure/TestSTMProcedure.java | 1 +
.../confignode/procedure/entity/IncProcedure.java | 2 +-
.../confignode/procedure/entity/NoopProcedure.java | 2 +-
.../procedure/entity/SimpleLockProcedure.java | 2 +-
.../procedure/entity/SimpleSTMProcedure.java | 2 +-
.../procedure/entity/SleepProcedure.java | 2 +-
.../procedure/entity/StuckProcedure.java | 2 +-
.../procedure/entity/StuckSTMProcedure.java | 2 +-
.../procedure/env/TestConfigNodeEnv.java} | 10 +-
.../procedure/{ => env}/TestProcEnv.java | 2 +-
.../procedure/store/TestProcedureStore.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 125 ++++++++-------
.../confignode1conf/iotdb-confignode.properties | 9 ++
.../iotdb/commons/partition/DataPartition.java | 7 +-
.../iotdb/commons/partition/SchemaPartition.java | 16 +-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 3 +-
31 files changed, 559 insertions(+), 256 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
new file mode 100644
index 0000000000..62c7d2a16f
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
+public class SyncDataNodeClientPool {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataNodeClientPool.class);
+
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
+
+ private SyncDataNodeClientPool() {
+ clientManager =
+ new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new ConfigNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ public TSStatus invalidatePartitionCache(
+ TEndPoint endPoint, TInvalidateCacheReq invalidateCacheReq) {
+ SyncDataNodeInternalServiceClient client;
+ TSStatus status;
+ try {
+ client = clientManager.borrowClient(endPoint);
+ status = client.invalidatePartitionCache(invalidateCacheReq);
+ LOGGER.info("Invalid Schema Cache {} successfully", invalidateCacheReq);
+ } catch (IOException e) {
+ LOGGER.error("Can't connect to DataNode {}", endPoint, e);
+ status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
+ } catch (TException e) {
+ LOGGER.error("Invalid Schema Cache on DataNode {} failed", endPoint, e);
+ status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+ return status;
+ }
+
+ public TSStatus invalidateSchemaCache(
+ TEndPoint endPoint, TInvalidateCacheReq invalidateCacheReq) {
+ TSStatus status;
+ try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
+ status = client.invalidateSchemaCache(invalidateCacheReq);
+ } catch (IOException e) {
+ LOGGER.error("Can't connect to DataNode {}", endPoint, e);
+ status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
+ } catch (TException e) {
+ LOGGER.error("Invalid Schema Cache on DataNode {} failed", endPoint, e);
+ status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+ return status;
+ }
+
+ public void deleteRegions(Set<TRegionReplicaSet> deletedRegionSet) {
+ Map<TDataNodeLocation, List<TConsensusGroupId>> regionLocationMap = new HashMap<>();
+ deletedRegionSet.forEach(
+ (tRegionReplicaSet) -> {
+ final List<TDataNodeLocation> dataNodeLocations =
+ tRegionReplicaSet.getDataNodeLocations();
+ regionLocationMap
+ .computeIfAbsent(dataNodeLocations.get(0), k -> new ArrayList<>())
+ .add(tRegionReplicaSet.getRegionId());
+ });
+ LOGGER.info("Current regionLocationMap {} ", regionLocationMap);
+ regionLocationMap.forEach(
+ (dataNodeLocation, regionIds) -> {
+ deleteRegions(dataNodeLocation.getInternalEndPoint(), regionIds, deletedRegionSet);
+ });
+ }
+
+ private void deleteRegions(
+ TEndPoint endPoint,
+ List<TConsensusGroupId> regionIds,
+ Set<TRegionReplicaSet> deletedRegionSet) {
+ try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
+ for (TConsensusGroupId regionId : regionIds) {
+ LOGGER.debug("Delete region {} ", regionId);
+ final TSStatus status = client.deleteRegion(regionId);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.info("DELETE Region {} successfully", regionId);
+ deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error("Can't connect to DataNode {}", endPoint, e);
+ } catch (TException e) {
+ LOGGER.error("Delete Region on DataNode {} failed", endPoint, e);
+ }
+ }
+
+ // TODO: Is the ClientPool must be a singleton?
+ private static class ClientPoolHolder {
+
+ private static final SyncDataNodeClientPool INSTANCE = new SyncDataNodeClientPool();
+
+ private ClientPoolHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static SyncDataNodeClientPool getInstance() {
+ return ClientPoolHolder.INSTANCE;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index 26cd531973..50aa61adea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartition
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
@@ -158,6 +159,9 @@ public abstract class ConfigRequest implements IConsensusRequest {
case UpdateProcedure:
req = new UpdateProcedureReq();
break;
+ case PreDeleteStorageGroup:
+ req = new PreDeleteStorageGroupReq();
+ break;
case DeleteStorageGroup:
req = new DeleteStorageGroupReq();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index 78a69f0ffa..a4b1606371 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -27,6 +27,7 @@ public enum ConfigRequestType {
SetDataReplicationFactor,
SetTimePartitionInterval,
DeleteStorageGroup,
+ PreDeleteStorageGroup,
GetStorageGroup,
CountStorageGroup,
CreateRegions,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/PreDeleteStorageGroupReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/PreDeleteStorageGroupReq.java
new file mode 100644
index 0000000000..d0aab342a8
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/PreDeleteStorageGroupReq.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PreDeleteStorageGroupReq extends ConfigRequest {
+ private String storageGroup;
+ private PreDeleteType preDeleteType;
+
+ public PreDeleteStorageGroupReq() {
+ super(ConfigRequestType.PreDeleteStorageGroup);
+ }
+
+ public PreDeleteStorageGroupReq(String storageGroup, PreDeleteType preDeleteType) {
+ this();
+ this.storageGroup = storageGroup;
+ this.preDeleteType = preDeleteType;
+ }
+
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setStorageGroup(String storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ public PreDeleteType getPreDeleteType() {
+ return preDeleteType;
+ }
+
+ public void setPreDeleteType(PreDeleteType preDeleteType) {
+ this.preDeleteType = preDeleteType;
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(ConfigRequestType.PreDeleteStorageGroup.ordinal());
+ BasicStructureSerDeUtil.write(storageGroup, buffer);
+ buffer.put(preDeleteType.getType());
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.storageGroup = BasicStructureSerDeUtil.readString(buffer);
+ this.preDeleteType = buffer.get() == (byte) 1 ? PreDeleteType.ROLLBACK : PreDeleteType.EXECUTE;
+ }
+
+ public enum PreDeleteType {
+ EXECUTE((byte) 0),
+ ROLLBACK((byte) 1);
+
+ private final byte type;
+
+ PreDeleteType(byte type) {
+ this.type = type;
+ }
+
+ public byte getType() {
+ return type;
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 31390458d2..9b2001ee0a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -135,6 +135,7 @@ public class ConfigManager implements Manager {
public void close() throws IOException {
consensusManager.close();
+ partitionManager.getRegionCleaner().shutdown();
procedureManager.shiftExecutor(false);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 6384604c15..03822abecc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -24,7 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetChildNodesPartitionReq;
@@ -35,6 +37,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
@@ -54,6 +57,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/** The PartitionManager Manages cluster PartitionTable read and write requests. */
public class PartitionManager {
@@ -65,12 +72,21 @@ public class PartitionManager {
private SeriesPartitionExecutor executor;
+ private final ScheduledExecutorService regionCleaner;
+
public PartitionManager(Manager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
this.partitionInfo = partitionInfo;
+ this.regionCleaner =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-StorageGroup-Cleaner");
+ regionCleaner.scheduleAtFixedRate(this::clearDeletedStorageGroup, 10, 5, TimeUnit.SECONDS);
setSeriesPartitionExecutor();
}
+ public ScheduledExecutorService getRegionCleaner() {
+ return regionCleaner;
+ }
+
/**
* Get SchemaPartition
*
@@ -365,6 +381,27 @@ public class PartitionManager {
return schemaNodeManagementResp;
}
+ public void preDeleteStorageGroup(
+ String storageGroup, PreDeleteStorageGroupReq.PreDeleteType preDeleteType) {
+ final PreDeleteStorageGroupReq preDeleteStorageGroupReq =
+ new PreDeleteStorageGroupReq(storageGroup, preDeleteType);
+ getConsensusManager().write(preDeleteStorageGroupReq);
+ }
+
+ private void clearDeletedStorageGroup() {
+ if (getConsensusManager().isLeader()) {
+ final Set<TRegionReplicaSet> deletedRegionSet = partitionInfo.getDeletedRegionSet();
+ if (!deletedRegionSet.isEmpty()) {
+ LOGGER.info(
+ "DELETE REGIONS {} START",
+ deletedRegionSet.stream()
+ .map(TRegionReplicaSet::getRegionId)
+ .collect(Collectors.toList()));
+ SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
+ }
+ }
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 45f8d8dd5e..c3cae2178b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -90,7 +90,7 @@ public class ProcedureManager {
List<Long> procIdList = new ArrayList<>();
for (TStorageGroupSchema storageGroupSchema : deleteSgSchemaList) {
DeleteStorageGroupProcedure deleteStorageGroupProcedure =
- new DeleteStorageGroupProcedure(storageGroupSchema, configManager);
+ new DeleteStorageGroupProcedure(storageGroupSchema);
long procId = this.executor.submitProcedure(deleteStorageGroupProcedure);
procIdList.add(procId);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
index ab7c02b577..4339f8025d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
@@ -61,9 +62,12 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -82,6 +86,9 @@ public class PartitionInfo implements SnapshotProcessor {
private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaMap;
// Map<TConsensusGroupId, allocatedSlotsNumber>
private final Map<TConsensusGroupId, Long> regionSlotsCounter;
+ // preDeleted TODO: Combine it with Partition.class
+ private final Set<String> preDeletedStorageGroup = new CopyOnWriteArraySet<>();
+ private final Set<TRegionReplicaSet> deletedRegionSet = new HashSet<>();
// SchemaPartition read write lock
private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
@@ -166,7 +173,7 @@ public class PartitionInfo implements SnapshotProcessor {
regionReadWriteLock.writeLock().lock();
try {
for (TConsensusGroupId consensusGroupId : req.getConsensusGroupIds()) {
- regionReplicaMap.remove(consensusGroupId);
+ deletedRegionSet.add(regionReplicaMap.remove(consensusGroupId));
regionSlotsCounter.remove(consensusGroupId);
}
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -209,7 +216,7 @@ public class PartitionInfo implements SnapshotProcessor {
try {
schemaPartitionResp.setSchemaPartition(
- schemaPartition.getSchemaPartition(req.getPartitionSlotsMap()));
+ schemaPartition.getSchemaPartition(req.getPartitionSlotsMap(), preDeletedStorageGroup));
} finally {
schemaPartitionReadWriteLock.readLock().unlock();
schemaPartitionResp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
@@ -283,7 +290,8 @@ public class PartitionInfo implements SnapshotProcessor {
dataPartition.getDataPartition(
req.getPartitionSlotsMap(),
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
- ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum()));
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
+ preDeletedStorageGroup));
} finally {
dataPartitionReadWriteLock.readLock().unlock();
dataPartitionResp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
@@ -410,6 +418,7 @@ public class PartitionInfo implements SnapshotProcessor {
dataPartitionReadWriteLock.writeLock().lock();
try {
dataPartition.getDataPartitionMap().remove(storageGroup);
+ preDeletedStorageGroup.remove(storageGroup);
} finally {
dataPartitionReadWriteLock.writeLock().unlock();
}
@@ -419,11 +428,31 @@ public class PartitionInfo implements SnapshotProcessor {
schemaPartitionReadWriteLock.writeLock().lock();
try {
schemaPartition.getSchemaPartitionMap().remove(storageGroup);
+ preDeletedStorageGroup.remove(storageGroup);
} finally {
schemaPartitionReadWriteLock.writeLock().unlock();
}
}
+ public TSStatus preDeleteStorageGroup(PreDeleteStorageGroupReq preDeleteStorageGroupReq) {
+ final PreDeleteStorageGroupReq.PreDeleteType preDeleteType =
+ preDeleteStorageGroupReq.getPreDeleteType();
+ final String storageGroup = preDeleteStorageGroupReq.getStorageGroup();
+ switch (preDeleteType) {
+ case EXECUTE:
+ preDeletedStorageGroup.add(storageGroup);
+ break;
+ case ROLLBACK:
+ preDeletedStorageGroup.remove(storageGroup);
+ break;
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public Set<TRegionReplicaSet> getDeletedRegionSet() {
+ return deletedRegionSet;
+ }
+
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
File snapshotFile = new File(snapshotDir, snapshotFileName);
@@ -447,6 +476,8 @@ public class PartitionInfo implements SnapshotProcessor {
ReadWriteIOUtils.write(nextRegionGroupId.get(), fileOutputStream);
// serialize regionMap
serializeRegionMap(fileOutputStream, protocol);
+ // serialize deletedRegionSet
+ serializeDeletedRegionSet(fileOutputStream, protocol);
// serialize schemaPartition
schemaPartition.serialize(fileOutputStream, protocol);
// serialize dataPartition
@@ -491,6 +522,8 @@ public class PartitionInfo implements SnapshotProcessor {
// start to restore
nextRegionGroupId.set(ReadWriteIOUtils.readInt(fileInputStream));
deserializeRegionMap(fileInputStream, protocol);
+ // deserialize deletedRegionSet
+ deserializeDeletedRegionSet(fileInputStream, protocol);
schemaPartition.deserialize(fileInputStream, protocol);
dataPartition.deserialize(fileInputStream, protocol);
} finally {
@@ -577,6 +610,25 @@ public class PartitionInfo implements SnapshotProcessor {
}
}
+ private void serializeDeletedRegionSet(OutputStream outputStream, TProtocol protocol)
+ throws TException, IOException {
+ ReadWriteIOUtils.write(regionReplicaMap.size(), outputStream);
+ for (TRegionReplicaSet regionReplicaSet : deletedRegionSet) {
+ regionReplicaSet.write(protocol);
+ }
+ }
+
+ private void deserializeDeletedRegionSet(InputStream inputStream, TProtocol protocol)
+ throws TException, IOException {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ while (size > 0) {
+ TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
+ tRegionReplicaSet.read(protocol);
+ deletedRegionSet.add(tRegionReplicaSet);
+ size--;
+ }
+ }
+
public void clear() {
nextRegionGroupId = new AtomicInteger(0);
regionReplicaMap.clear();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 8f363bbb63..dfad8c10c4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartition
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
@@ -140,6 +141,8 @@ public class ConfigRequestExecutor {
case DeleteStorageGroup:
partitionInfo.deleteStorageGroup((DeleteStorageGroupReq) req);
return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupReq) req);
+ case PreDeleteStorageGroup:
+ return partitionInfo.preDeleteStorageGroup((PreDeleteStorageGroupReq) req);
case SetTTL:
return clusterSchemaInfo.setTTL((SetTTLReq) req);
case SetSchemaReplicationFactor:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 922cebff50..307355b6fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -19,19 +19,22 @@
package org.apache.iotdb.confignode.procedure.env;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
public class ConfigNodeProcedureEnv {
@@ -40,45 +43,84 @@ public class ConfigNodeProcedureEnv {
private final ConfigManager configManager;
+ private static boolean skipForTest = false;
+
+ private static boolean invalidCacheResult = true;
+
+ public static void setSkipForTest(boolean skipForTest) {
+ ConfigNodeProcedureEnv.skipForTest = skipForTest;
+ }
+
+ public static void setInvalidCacheResult(boolean result) {
+ ConfigNodeProcedureEnv.invalidCacheResult = result;
+ }
+
public ConfigNodeProcedureEnv(ConfigManager configManager) {
this.configManager = configManager;
}
- // TODO: reuse the same ClientPool with other module
- private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
- INTERNAL_SERVICE_CLIENT_MANAGER =
- new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
-
public ConfigManager getConfigManager() {
return configManager;
}
- public InternalService.Client getDataNodeClient(TRegionReplicaSet dataRegionReplicaSet)
- throws IOException {
- List<TDataNodeLocation> dataNodeLocations = dataRegionReplicaSet.getDataNodeLocations();
- int retry = dataNodeLocations.size() - 1;
- for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
- try {
- return INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(dataNodeLocation.getInternalEndPoint());
- } catch (IOException e) {
- if (retry-- > 0) {
- LOG.warn(
- "Connect dataRegion-{} at dataNode-{} failed, trying next replica..",
- dataRegionReplicaSet.getRegionId(),
- dataNodeLocation);
- } else {
- LOG.warn("Connect dataRegion{} failed", dataRegionReplicaSet.getRegionId());
- throw e;
- }
+ /**
+ * Delete config node information, includes (mTree, partitionInfo, regionMap)
+ *
+ * @param deleteSgSchema storage group name
+ * @return tsStatus
+ */
+ public TSStatus deleteConfig(TStorageGroupSchema deleteSgSchema) {
+ DeleteStorageGroupReq deleteStorageGroupReq = new DeleteStorageGroupReq(deleteSgSchema);
+ return configManager.getClusterSchemaManager().deleteStorageGroup(deleteStorageGroupReq);
+ }
+
+ /**
+ * Pre delete a storage group
+ *
+ * @param preDeleteType execute/rollback
+ * @param deleteSgName storage group name
+ */
+ public void preDelete(PreDeleteStorageGroupReq.PreDeleteType preDeleteType, String deleteSgName) {
+ configManager.getPartitionManager().preDeleteStorageGroup(deleteSgName, preDeleteType);
+ }
+
+ /**
+ * @param storageGroupName Storage group name
+ * @return ALL SUCCESS OR NOT
+ * @throws IOException IOE
+ * @throws TException Thrift IOE
+ */
+ public boolean invalidateCache(String storageGroupName) throws IOException, TException {
+ // TODO: Remove it after IT is supported
+ if (skipForTest) {
+ return invalidCacheResult;
+ }
+ List<TDataNodeInfo> allDataNodes = configManager.getNodeManager().getOnlineDataNodes(-1);
+ TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
+ invalidateCacheReq.setStorageGroup(true);
+ invalidateCacheReq.setFullPath(storageGroupName);
+ for (TDataNodeInfo dataNodeInfo : allDataNodes) {
+ final TSStatus invalidateSchemaStatus =
+ SyncDataNodeClientPool.getInstance()
+ .invalidateSchemaCache(
+ dataNodeInfo.getLocation().getInternalEndPoint(), invalidateCacheReq);
+ final TSStatus invalidatePartitionStatus =
+ SyncDataNodeClientPool.getInstance()
+ .invalidatePartitionCache(
+ dataNodeInfo.getLocation().getInternalEndPoint(), invalidateCacheReq);
+ if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
+ LOG.error(
+ "Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}",
+ invalidatePartitionStatus,
+ invalidateSchemaStatus);
+ return false;
}
}
- return null;
+ return true;
}
- public InternalService.Client getDataNodeClient(TDataNodeLocation dataNodeLocation)
- throws IOException {
- return INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(dataNodeLocation.getInternalEndPoint());
+ public boolean verifySucceed(TSStatus... status) {
+ return Arrays.stream(status)
+ .allMatch(tsStatus -> tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
index 3566381500..24b7acc410 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
@@ -19,16 +19,10 @@
package org.apache.iotdb.confignode.procedure.impl;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
-import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
-import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
-import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -38,8 +32,7 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.state.DeleteStorageGroupState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -47,34 +40,21 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
public class DeleteStorageGroupProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, DeleteStorageGroupState> {
private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
private static final int retryThreshold = 5;
- private static boolean byPassForTest = false;
-
- private ConfigManager configManager;
-
- @TestOnly
- public static void setByPassForTest(boolean byPass) {
- byPassForTest = byPass;
- }
-
private TStorageGroupSchema deleteSgSchema;
public DeleteStorageGroupProcedure() {
super();
}
- public DeleteStorageGroupProcedure(
- TStorageGroupSchema deleteSgSchema, ConfigManager configManager) {
+ public DeleteStorageGroupProcedure(TStorageGroupSchema deleteSgSchema) {
super();
this.deleteSgSchema = deleteSgSchema;
- this.configManager = configManager;
}
public TStorageGroupSchema getDeleteSgSchema() {
@@ -91,125 +71,72 @@ public class DeleteStorageGroupProcedure
if (deleteSgSchema == null) {
return Flow.NO_MORE_STATE;
}
- String storageGroupName = deleteSgSchema.getName();
- List<TConsensusGroupId> dataRegionGroupIds = deleteSgSchema.getDataRegionGroupIds();
- List<TConsensusGroupId> schemaRegionGroupIds = deleteSgSchema.getSchemaRegionGroupIds();
- List<TRegionReplicaSet> dataRegionReplicaSets =
- new ArrayList<>(
- configManager.getPartitionManager().getRegionReplicaSets(dataRegionGroupIds));
- List<TRegionReplicaSet> schemaRegionReplicaSets =
- new ArrayList<>(
- configManager.getPartitionManager().getRegionReplicaSets(schemaRegionGroupIds));
try {
switch (state) {
case DELETE_STORAGE_GROUP_PREPARE:
// TODO: lock related ClusterSchemaInfo, PartitionInfo and Regions
- setNextState(DeleteStorageGroupState.DELETE_DATA_REGION);
+ setNextState(DeleteStorageGroupState.DELETE_PRE);
break;
- case DELETE_DATA_REGION:
- LOG.info("Delete dataRegions of {}", storageGroupName);
- if (byPassForTest || deleteRegion(env, dataRegionReplicaSets)) {
- setNextState(DeleteStorageGroupState.DELETE_SCHEMA_REGION);
- }
+ case DELETE_PRE:
+ LOG.info("Pre delete for Storage group {}", deleteSgSchema.getName());
+ env.preDelete(PreDeleteStorageGroupReq.PreDeleteType.EXECUTE, deleteSgSchema.getName());
+ setNextState(DeleteStorageGroupState.INVALIDATE_CACHE);
break;
- case DELETE_SCHEMA_REGION:
- LOG.info("Delete schemaRegions of {}", storageGroupName);
- if (byPassForTest || deleteRegion(env, schemaRegionReplicaSets)) {
+ case INVALIDATE_CACHE:
+ LOG.info("Invalidate cache of {}", deleteSgSchema.getName());
+ if (env.invalidateCache(deleteSgSchema.getName())) {
setNextState(DeleteStorageGroupState.DELETE_CONFIG);
+ } else {
+ setFailure(new ProcedureException("Invalidate cache failed"));
}
break;
case DELETE_CONFIG:
- LOG.info("Delete config info of {}", storageGroupName);
- TSStatus status = deleteConfig(env, deleteSgSchema);
- if (verifySucceed(status)) {
- if (byPassForTest) {
- return Flow.NO_MORE_STATE;
- }
- setNextState(DeleteStorageGroupState.INVALIDATE_CACHE);
+ LOG.info("Delete config info of {}", deleteSgSchema.getName());
+ TSStatus status = env.deleteConfig(deleteSgSchema);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return Flow.NO_MORE_STATE;
} else if (getCycles() > retryThreshold) {
- setFailure(
- new org.apache.iotdb.confignode.procedure.exception.ProcedureException(
- "Delete config info id failed, status is " + status));
+ setFailure(new ProcedureException("Delete config info id failed"));
}
- break;
- case INVALIDATE_CACHE:
- LOG.info("Invalidate cache of {}", storageGroupName);
- invalidateCache(env, storageGroupName);
- return Flow.NO_MORE_STATE;
}
} catch (TException | IOException e) {
- LOG.error(
- "Retriable error trying to delete storage group {}, state {}",
- storageGroupName,
- state,
- e);
- if (getCycles() > retryThreshold) {
- setFailure(
- new org.apache.iotdb.confignode.procedure.exception.ProcedureException(
- "State stack at " + state));
- }
- }
- return Flow.HAS_MORE_STATE;
- }
-
- private TSStatus deleteConfig(ConfigNodeProcedureEnv env, TStorageGroupSchema deleteSgSchema) {
- DeleteStorageGroupReq deleteStorageGroupReq = new DeleteStorageGroupReq(deleteSgSchema);
- return env.getConfigManager()
- .getClusterSchemaManager()
- .deleteStorageGroup(deleteStorageGroupReq);
- }
-
- private boolean deleteRegion(
- ConfigNodeProcedureEnv env, List<TRegionReplicaSet> regionReplicaSets) throws TException {
- for (TRegionReplicaSet dataRegionReplicaSet : regionReplicaSets) {
- TConsensusGroupId regionId = dataRegionReplicaSet.getRegionId();
- InternalService.Client dataNodeClient = null;
- try {
- dataNodeClient = env.getDataNodeClient(dataRegionReplicaSet);
- if (dataNodeClient != null) {
- TSStatus status = dataNodeClient.deleteRegion(regionId);
- if (status.getCode() != StatusUtils.OK.getCode()) {
- if (getCycles() > retryThreshold) {
- setFailure(
- new org.apache.iotdb.confignode.procedure.exception.ProcedureException(
- "Delete data region id=" + regionId + " failed, status is " + status));
- }
- return false;
- }
- LOG.info("Delete region {} success", regionId);
- }
- } catch (IOException e) {
- LOG.error("Connect dataRegion-{} failed", dataRegionReplicaSet.getRegionId(), e);
+ if (isRollbackSupported(state)) {
+ setFailure(new ProcedureException("Delete storage group failed " + state));
+ } else {
+ LOG.error(
+ "Retriable error trying to delete storage group {}, state {}",
+ deleteSgSchema.getName(),
+ state,
+ e);
if (getCycles() > retryThreshold) {
- setFailure(
- new ProcedureException(
- "Delete data region id=" + regionId + " failed", e.getCause()));
+ setFailure(new ProcedureException("State stuck at " + state));
}
- return false;
}
}
- return true;
+ return Flow.HAS_MORE_STATE;
}
- private void invalidateCache(ConfigNodeProcedureEnv env, String storageGroupName)
- throws IOException, TException {
- List<TDataNodeInfo> allDataNodes =
- env.getConfigManager().getNodeManager().getOnlineDataNodes(-1);
- TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
- invalidateCacheReq.setStorageGroup(true);
- invalidateCacheReq.setFullPath(storageGroupName);
- for (TDataNodeInfo dataNodeInfo : allDataNodes) {
- env.getDataNodeClient(dataNodeInfo.getLocation()).invalidateSchemaCache(invalidateCacheReq);
- env.getDataNodeClient(dataNodeInfo.getLocation())
- .invalidatePartitionCache(invalidateCacheReq);
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, DeleteStorageGroupState state)
+ throws IOException, InterruptedException {
+ switch (state) {
+ case DELETE_PRE:
+ case INVALIDATE_CACHE:
+ LOG.info("Rollback preDeleted:{}", deleteSgSchema.getName());
+ env.preDelete(PreDeleteStorageGroupReq.PreDeleteType.ROLLBACK, deleteSgSchema.getName());
+ break;
}
}
@Override
- protected void rollbackState(
- ConfigNodeProcedureEnv clusterProcedureEnvironment,
- DeleteStorageGroupState deleteStorageGroupState)
- throws IOException, InterruptedException {}
+ protected boolean isRollbackSupported(DeleteStorageGroupState state) {
+ switch (state) {
+ case DELETE_PRE:
+ case INVALIDATE_CACHE:
+ return true;
+ }
+ return false;
+ }
@Override
protected DeleteStorageGroupState getState(int stateId) {
@@ -239,14 +166,10 @@ public class DeleteStorageGroupProcedure
try {
deleteSgSchema = ThriftConfigNodeSerDeUtils.deserializeTStorageGroupSchema(byteBuffer);
} catch (ThriftSerDeException e) {
- LOG.error("error in deser", e);
+ LOG.error("Error in deserialize DeleteStorageGroupProcedure", e);
}
}
- public boolean verifySucceed(TSStatus status) {
- return status.getCode() == StatusUtils.OK.getCode();
- }
-
@Override
public boolean equals(Object that) {
if (that instanceof DeleteStorageGroupProcedure) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteStorageGroupState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteStorageGroupState.java
index 4ac0531dd2..2ca63c67a8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteStorageGroupState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteStorageGroupState.java
@@ -21,8 +21,7 @@ package org.apache.iotdb.confignode.procedure.state;
public enum DeleteStorageGroupState {
DELETE_STORAGE_GROUP_PREPARE,
- DELETE_DATA_REGION,
- DELETE_SCHEMA_REGION,
- DELETE_CONFIG,
- INVALIDATE_CACHE
+ DELETE_PRE,
+ INVALIDATE_CACHE,
+ DELETE_CONFIG
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index 58bfb4c8d8..c1e1eb7a86 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -129,6 +129,8 @@ public class PartitionInfoTest {
int nextId = partitionInfo.getNextRegionGroupId();
Map<TConsensusGroupId, Long> counter_before = partitionInfo.getRegionSlotsCounter();
+ partitionInfo.getDeletedRegionSet().add(dataRegionReplicaSet);
+ partitionInfo.getDeletedRegionSet().add(schemaRegionReplicaSet);
partitionInfo.processTakeSnapshot(snapshotDir);
partitionInfo.clear();
partitionInfo.processLoadSnapshot(snapshotDir);
@@ -161,6 +163,9 @@ public class PartitionInfoTest {
Assert.assertEquals(counter_before, partitionInfo.getRegionSlotsCounter());
Assert.assertEquals(dataMap_before, partitionInfo.getDataPartition().getDataPartitionMap());
+ Assert.assertEquals(2, partitionInfo.getDeletedRegionSet().size());
+ Assert.assertTrue(partitionInfo.getDeletedRegionSet().contains(dataRegionReplicaSet));
+ Assert.assertTrue(partitionInfo.getDeletedRegionSet().contains(schemaRegionReplicaSet));
}
private TRegionReplicaSet generateTRegionReplicaSet(
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureBase.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureBase.java
index fe630681ef..bce76d3f45 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureBase.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureBase.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.procedure;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
import org.junit.After;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index aa35a33e56..046e4f9ffa 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.confignode.procedure.entity.IncProcedure;
import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
index 6c4ab3234d..7b387d3d43 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.confignode.procedure.entity.SimpleSTMProcedure;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
index b5c8c9b543..0033971262 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.entity;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
index f2250ec32e..bdaf0401cc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.entity;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
index 98cd53edc6..564b980794 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.entity;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
index a95905ca76..86161619b3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleSTMProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.entity;
import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
index 3e2816fed4..0b9f420b31 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.entity;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
index 9363a7e793..1db02597c4 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.entity;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import java.io.IOException;
import java.util.concurrent.Semaphore;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
index f78ae176d8..adba817b4d 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckSTMProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.entity;
import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteStorageGroupState.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/TestConfigNodeEnv.java
similarity index 79%
copy from confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteStorageGroupState.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/TestConfigNodeEnv.java
index 4ac0531dd2..a27b2244ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteStorageGroupState.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/TestConfigNodeEnv.java
@@ -17,12 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.state;
+package org.apache.iotdb.confignode.procedure.env;
-public enum DeleteStorageGroupState {
- DELETE_STORAGE_GROUP_PREPARE,
- DELETE_DATA_REGION,
- DELETE_SCHEMA_REGION,
- DELETE_CONFIG,
- INVALIDATE_CACHE
-}
+public class TestConfigNodeEnv {}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcEnv.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/TestProcEnv.java
similarity index 97%
rename from confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcEnv.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/TestProcEnv.java
index cfe77585a5..63e4f1997b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcEnv.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/TestProcEnv.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure;
+package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
index 6d691cc201..9e1139adc3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
-import org.apache.iotdb.confignode.procedure.TestProcEnv;
import org.apache.iotdb.confignode.procedure.TestProcedureBase;
import org.apache.iotdb.confignode.procedure.entity.IncProcedure;
import org.apache.iotdb.confignode.procedure.entity.StuckSTMProcedure;
import org.apache.iotdb.confignode.procedure.entity.TestProcedureFactory;
+import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index a3d02b596d..371993af04 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -37,8 +37,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
-import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
@@ -50,8 +49,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
@@ -150,7 +147,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void testRegisterAndQueryDataNode() throws TException {
+ public void registerAndQueryDataNodeTest() throws TException {
registerDataNodes();
// test success re-register
@@ -204,7 +201,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void testSetAndQueryStorageGroup() throws TException {
+ public void setAndQueryStorageGroupTest() throws TException {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -306,7 +303,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void testGetAndCreateSchemaPartition()
+ public void getAndCreateSchemaPartitionTest()
throws TException, IOException, IllegalPathException {
final String sg = "root.sg";
final String sg0 = "root.sg0";
@@ -502,7 +499,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void testGetAndCreateDataPartition() throws TException {
+ public void getAndCreateDataPartitionTest() throws TException {
final String sg = "root.sg";
final int storageGroupNum = 2;
final int seriesPartitionSlotNum = 4;
@@ -562,7 +559,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@Test
- public void testPermission() throws TException {
+ public void permissionTest() throws TException {
TSStatus status;
List<String> userList = new ArrayList<>();
@@ -898,6 +895,60 @@ public class ConfigNodeRPCServiceProcessorTest {
}
}
+ @Test
+ public void deleteStorageGroupTest() throws TException {
+ TSStatus status;
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
+ // register DataNodes
+ registerDataNodes();
+ ConfigNodeProcedureEnv.setSkipForTest(true);
+ TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+ // set StorageGroup0 by default values
+ status = processor.setStorageGroup(setReq0);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ // set StorageGroup1 by specific values
+ TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
+ status = processor.setStorageGroup(setReq1);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
+ List<String> sgs = Arrays.asList(sg0, sg1);
+ deleteStorageGroupsReq.setPrefixPathList(sgs);
+ TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
+ TStorageGroupSchemaResp root =
+ processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
+ Assert.assertTrue(root.getStorageGroupSchemaMap().isEmpty());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), deleteSgStatus.getCode());
+ }
+
+ @Test
+ public void deleteStorageGroupInvalidateCacheFailedTest() throws TException {
+ TSStatus status;
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
+ // register DataNodes
+ registerDataNodes();
+ ConfigNodeProcedureEnv.setSkipForTest(true);
+ ConfigNodeProcedureEnv.setInvalidCacheResult(false);
+ TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+ // set StorageGroup0 by default values
+ status = processor.setStorageGroup(setReq0);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ // set StorageGroup1 by specific values
+ TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
+ status = processor.setStorageGroup(setReq1);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
+ List<String> sgs = Arrays.asList(sg0, sg1);
+ deleteStorageGroupsReq.setPrefixPathList(sgs);
+ TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
+ TStorageGroupSchemaResp root =
+ processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
+ // rollback success
+ Assert.assertEquals(root.getStorageGroupSchemaMap().size(), 2);
+ Assert.assertEquals(TSStatusCode.MULTIPLE_ERROR.getStatusCode(), deleteSgStatus.getCode());
+ }
+
private void cleanUserAndRole() throws TException {
TSStatus status;
@@ -949,60 +1000,4 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
}
}
-
- @Test
- public void testDeleteStorageGroup() throws TException {
- TSStatus status;
- final String sg0 = "root.sg0";
- final String sg1 = "root.sg1";
- // register DataNodes
- registerDataNodes();
- TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
- // set StorageGroup0 by default values
- status = processor.setStorageGroup(setReq0);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- // set StorageGroup1 by specific values
- TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
- status = processor.setStorageGroup(setReq1);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
- List<String> sgs = Arrays.asList(sg0, sg1);
- deleteStorageGroupsReq.setPrefixPathList(sgs);
- DeleteStorageGroupProcedure.setByPassForTest(true);
- TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
- TStorageGroupSchemaResp root =
- processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
- Assert.assertTrue(root.getStorageGroupSchemaMap().isEmpty());
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), deleteSgStatus.getCode());
- }
-
- @Test
- public void testGetSchemaNodeManagementPartition()
- throws TException, IllegalPathException, IOException {
- final String sg = "root.sg";
- final int storageGroupNum = 2;
-
- TSStatus status;
- TSchemaNodeManagementReq nodeManagementReq;
- TSchemaNodeManagementResp nodeManagementResp;
-
- // register DataNodes
- registerDataNodes();
-
- // set StorageGroups
- for (int i = 0; i < storageGroupNum; i++) {
- TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
- status = processor.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- }
-
- ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
- nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer, NodeManagementType.CHILD_PATHS);
- nodeManagementResp = processor.getSchemaNodeManagementPartition(nodeManagementReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
- Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
- Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
- Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
- }
}
diff --git a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
index 576c319262..2f9245a664 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
@@ -22,8 +22,17 @@ rpc_port=22277
consensus_port=22278
target_confignode=0.0.0.0:22277
config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
+#data_node_consensus_protocol_class=org.apache.iotdb.consensus.standalone.StandAloneConsensus
data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
+# Default number of SchemaRegion replicas
+# Datatype: int
+#schema_replication_factor=1
+
+
+# Default number of DataRegion replicas
+# Datatype: int
+#data_replication_factor=1
system_dir=target/confignode1/system
data_dirs=target/confignode1/data
consensus_dir=target/confignode1/consensus
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3fff3e130c..e622a284de 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.stream.Collectors;
public class DataPartition extends Partition {
@@ -138,13 +139,15 @@ public class DataPartition extends Partition {
public DataPartition getDataPartition(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap,
String seriesSlotExecutorName,
- int seriesPartitionSlotNum) {
+ int seriesPartitionSlotNum,
+ Set<String> preDeletedStorageGroup) {
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
result = new HashMap<>();
for (String storageGroupName : partitionSlotsMap.keySet()) {
// Compare StorageGroupName
- if (dataPartitionMap.containsKey(storageGroupName)) {
+ if (dataPartitionMap.containsKey(storageGroupName)
+ && !preDeletedStorageGroup.contains(storageGroupName)) {
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
seriesTimePartitionSlotMap = dataPartitionMap.get(storageGroupName);
for (TSeriesPartitionSlot seriesPartitionSlot :
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index 2b05700add..2311f7a743 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
public class SchemaPartition extends Partition {
@@ -90,21 +91,28 @@ public class SchemaPartition extends Partition {
* Get SchemaPartition by partitionSlotsMap
*
* @param partitionSlotsMap Map<StorageGroup, List<SeriesPartitionSlot>>
+ * @param preDeletedStorageGroup
* @return Subset of current SchemaPartition, including Map<StorageGroup, Map<SeriesPartitionSlot,
* RegionReplicaSet>>
*/
public SchemaPartition getSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap) {
+ Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap,
+ Set<String> preDeletedStorageGroup) {
if (partitionSlotsMap.isEmpty()) {
// Return all SchemaPartitions when the partitionSlotsMap is empty
- return new SchemaPartition(
- new HashMap<>(schemaPartitionMap), seriesSlotExecutorName, seriesPartitionSlotNum);
+ final Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> resultAll =
+ new HashMap<>(schemaPartitionMap);
+ for (String preDeleted : preDeletedStorageGroup) {
+ resultAll.remove(preDeleted);
+ }
+ return new SchemaPartition(resultAll, seriesSlotExecutorName, seriesPartitionSlotNum);
} else {
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> result = new HashMap<>();
partitionSlotsMap.forEach(
(storageGroup, seriesPartitionSlots) -> {
- if (schemaPartitionMap.containsKey(storageGroup)) {
+ if (schemaPartitionMap.containsKey(storageGroup)
+ && !preDeletedStorageGroup.contains(storageGroup)) {
if (seriesPartitionSlots.isEmpty()) {
// Return all SchemaPartitions in one StorageGroup when the queried
// SeriesPartitionSlots is empty
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 944b84669b..479963c215 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -113,10 +113,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
// The query will be transited to FINISHED when invoking getBatchResult() at the last time
// So we don't need to clean up it manually
Optional<TsBlock> tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
- if (!tsBlock.isPresent()) {
+ if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
-
Binary binary;
SchemaTree fetchedSchemaTree;
Column column = tsBlock.get().getColumn(0);