You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/01/14 03:49:58 UTC

[GitHub] [iotdb] jt2594838 commented on a change in pull request #2405: [IOTDB-1079] Virtual storage group first mile stone

jt2594838 commented on a change in pull request #2405:
URL: https://github.com/apache/iotdb/pull/2405#discussion_r557002528



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -319,35 +360,88 @@ public ServiceType getID() {
     return ServiceType.STORAGE_ENGINE_SERVICE;
   }
 
-  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+  /**
+   * This method is for sync, delete tsfile or sth like them, just get storage group directly by sg
+   * name
+   *
+   * @param path storage group path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessorDirectly(PartialPath path)
+      throws StorageEngineException {
     PartialPath storageGroupPath;
     try {
       StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
       storageGroupPath = storageGroupMNode.getPartialPath();
-      StorageGroupProcessor processor = processorMap.get(storageGroupPath);
-      if (processor == null) {
+      return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
+  /**
+   * This method is for insert and query or sth like them, this may get a virtual storage group
+   *
+   * @param path device path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+    try {
+      StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
+      return getStorageGroupProcessorByPath(path, storageGroupMNode);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
+  /**
+   * get storage group processor by path
+   * @param storageGroupPath path of the storage group
+   * @param storageGroupMNode mnode of the storage group

Review comment:
       It is called `storageGroupPath`, but it may actually not be a path of a storage group (it could be a deviceId or even a lower level), so I think the naming is confusing.
   Why is `storageGroupMNode ` needed should be further explained.

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -231,6 +231,11 @@ mtree_snapshot_interval=100000
 # Only take effect when enable_mtree_snapshot=true.
 mtree_snapshot_threshold_time=3600
 
+# number of virtual storage group
+# virtual storage group is the unit of concurrency in memory
+# recommend value is [storage group number] * [virtual storage group number] = virtual cpu core number

Review comment:
       number of virtual storage groups per user-defined storage group
   a virtual storage group is the unit of parallelism in memory as all ingestions in one virtual storage group are serialized
   recommended value is  [virtual storage group number] = [CPU core number] / [user-defined storage group number]

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -845,6 +845,11 @@
    */
   private boolean debugState = false;
 
+  /**
+   * the number of virtual storage group
+   */
+  private int virtualStorageGroupNum = 8;

Review comment:
       the number of virtual storage groups per user-defined storage group

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -742,24 +911,19 @@ public void setFileFlushPolicy(TsFileFlushPolicy fileFlushPolicy) {
 
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath storageGroup,
       long partitionNum) {
-    StorageGroupProcessor processor = processorMap.get(storageGroup);
-    return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum);
-  }
-
-  public static long getTimePartitionInterval() {
-    if (timePartitionInterval == -1) {
-      initTimePartition();
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroup);
+    if (virtualStorageGroupManager == null) {
+      return false;
     }
-    return timePartitionInterval;
-  }
-
-  @TestOnly
-  public static void setTimePartitionInterval(long timePartitionInterval) {
-    StorageEngine.timePartitionInterval = timePartitionInterval;
-  }
 
-  public static long getTimePartition(long time) {
-    return enablePartition ? time / timePartitionInterval : 0;
+    for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
+      if (storageGroupProcessor != null && storageGroupProcessor
+          .isFileAlreadyExist(tsFileResource, partitionNum)) {
+        return true;
+      }
+    }

Review comment:
       This is a bit inefficient, and probably you can infer the virtual storage group of the file using its devices.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -457,7 +460,7 @@ private VersionController getVersionControllerByTimePartitionId(long timePartiti
     List<File> tsFiles = new ArrayList<>();
     List<File> upgradeFiles = new ArrayList<>();
     for (String baseDir : folders) {
-      File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
+      File fileFolder = fsFactory.getFile(baseDir + File.separator + logicalStorageGroupName, storageGroupName);

Review comment:
       This conflicts with what you have written in `SlotTsFileFilter.java`, please confirm it.

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/virtualSG/DeviceMappingViewer.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.virtualSG;
+
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+/**
+ * for DBA to view the mapping from device to virtual storage group ID
+ * usage: run this class with arguments [system_schema_dir], if args are not given, use default in config
+ */
+public class DeviceMappingViewer {
+
+  public static void main(String[] args) throws MetadataException {
+    // has schema log dir
+    if(args.length == 1){
+      IoTDBDescriptor.getInstance().getConfig().setSchemaDir(args[0]);
+    }
+
+    HashVirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+    IoTDBDescriptor.getInstance().getConfig().setEnableMTreeSnapshot(false);
+    MManager mManager = MManager.getInstance();
+    mManager.init();

Review comment:
       `MManager.init()` initializes writers, so it may throw an IOException if it is used against a working IoTDB instance because the files (mlog and tlog) have already been opened by that IoTDB.
   Maybe a read-only mode of `MManager` is necessary.

##########
File path: server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
##########
@@ -180,8 +180,12 @@ public void testDeleteInBufferWriteFile()
       assertTrue(directory.isDirectory());
       if (directory.isDirectory()) {
         for (File file : directory.listFiles()) {
-          if (file.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
-            modFiles.add(file);
+          if(file.isDirectory()){

Review comment:
       Please perform a reformat here and there.

##########
File path: server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
##########
@@ -113,6 +114,11 @@ public void testRestartDelete()
 
     try {
       EnvironmentUtils.restartDaemon();
+      StorageEngine.getInstance().recover();
+      // wait for recover
+      while(!StorageEngine.getInstance().isAllSgReady()){
+        Thread.sleep(500);

Review comment:
       Better to add some logs if it has waited for too long, so we can know what went wrong.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -319,35 +360,88 @@ public ServiceType getID() {
     return ServiceType.STORAGE_ENGINE_SERVICE;
   }
 
-  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+  /**
+   * This method is for sync, delete tsfile or sth like them, just get storage group directly by sg
+   * name
+   *
+   * @param path storage group path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessorDirectly(PartialPath path)
+      throws StorageEngineException {
     PartialPath storageGroupPath;
     try {
       StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
       storageGroupPath = storageGroupMNode.getPartialPath();
-      StorageGroupProcessor processor = processorMap.get(storageGroupPath);
-      if (processor == null) {
+      return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
+  /**
+   * This method is for insert and query or sth like them, this may get a virtual storage group
+   *
+   * @param path device path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+    try {
+      StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
+      return getStorageGroupProcessorByPath(path, storageGroupMNode);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
+  /**
+   * get storage group processor by path
+   * @param storageGroupPath path of the storage group
+   * @param storageGroupMNode mnode of the storage group
+   * @return found or new storage group processor
+   */
+  @SuppressWarnings("java:S2445")
+  // actually storageGroupMNode is a unique object on the mtree, synchronize it is reasonable
+  private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath storageGroupPath,
+      StorageGroupMNode storageGroupMNode)
+      throws StorageGroupProcessorException, StorageEngineException {
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap
+        .get(storageGroupMNode.getPartialPath());
+    if (virtualStorageGroupManager == null) {
+      // if finish recover
+      if (isAllSgReady.get()) {
         waitAllSgReady(storageGroupPath);
-        // if finish recover
-        if (isAllSgReady.get()) {
-          synchronized (storageGroupMNode) {
-            processor = processorMap.get(storageGroupPath);
-            if (processor == null) {
-              logger.info("construct a processor instance, the storage group is {}, Thread is {}",
-                  storageGroupPath, Thread.currentThread().getId());
-              processor = new StorageGroupProcessor(systemDir, storageGroupPath.getFullPath(),
-                  fileFlushPolicy);
-              processor.setDataTTL(storageGroupMNode.getDataTTL());
-              processor.setCustomFlushListeners(customFlushListeners);
-              processor.setCustomCloseFileListeners(customCloseFileListeners);
-              processorMap.put(storageGroupPath, processor);
-            }
+        synchronized (storageGroupMNode) {
+          virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+          if (virtualStorageGroupManager == null) {
+            virtualStorageGroupManager = new VirtualStorageGroupManager();
+            processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroupManager);
           }
         }
+      } else {
+        // not finished recover, refuse the request
+        throw new StorageEngineException(
+            "the sg " + storageGroupMNode.getPartialPath()
+                + " may not ready now, please wait and retry later",
+            TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
       }
-      return processor;
-    } catch (StorageGroupProcessorException | MetadataException e) {
-      throw new StorageEngineException(e);
     }
+    return virtualStorageGroupManager.getProcessor(storageGroupPath, storageGroupMNode);
+  }
+
+  public StorageGroupProcessor buildNewStorageGroupProcessor(PartialPath storageGroupPath,
+      StorageGroupMNode storageGroupMNode, String storageGroupName)
+      throws StorageGroupProcessorException {
+    StorageGroupProcessor processor;
+    logger.info("construct a processor instance, the storage group is {}, Thread is {}",
+        storageGroupPath, Thread.currentThread().getId());
+    processor = new StorageGroupProcessor(systemDir + File.separator + storageGroupPath,
+        storageGroupName,
+        fileFlushPolicy, storageGroupMNode.getFullPath());
+    processor.setDataTTL(storageGroupMNode.getDataTTL());
+    processor.setCustomFlushListeners(customFlushListeners);
+    processor.setCustomCloseFileListeners(customCloseFileListeners);
+    return processor;
   }

Review comment:
       ![image](https://user-images.githubusercontent.com/23610645/104537821-a7a8d280-5655-11eb-8cd9-b471372ebc2f.png)
   While in VirtualStorageGroupManager, `storageGroupName` is passed as the virtual storage group number, which is also confusing.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -656,21 +802,38 @@ public synchronized boolean deleteAll() {
   }
 
   public void setTTL(PartialPath storageGroup, long dataTTL) throws StorageEngineException {
-    StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroup);
-    storageGroupProcessor.setDataTTL(dataTTL);
+    // storage group has no data
+    if (!processorMap.containsKey(storageGroup)) {
+      return;
+    }
+
+    for (StorageGroupProcessor storageGroupProcessor : processorMap.get(storageGroup)
+        .getAllVirutalStorageGroupProcessor()) {
+      if (storageGroupProcessor != null) {
+        storageGroupProcessor.setDataTTL(dataTTL);
+      }
+    }
   }
 
   public void deleteStorageGroup(PartialPath storageGroupPath) {
+    if (!processorMap.containsKey(storageGroupPath)) {
+      return;
+    }
+
     deleteAllDataFilesInOneStorageGroup(storageGroupPath);
-    StorageGroupProcessor processor = processorMap.remove(storageGroupPath);
-    if (processor != null) {
-      processor.deleteFolder(systemDir);
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.remove(storageGroupPath);
+    for (StorageGroupProcessor processor : virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
+      if (processor != null) {
+        processor.deleteFolder(systemDir + File.pathSeparator + storageGroupPath);
+      }
     }
   }
 
   public void loadNewTsFileForSync(TsFileResource newTsFileResource)
       throws StorageEngineException, LoadFileException, IllegalPathException {
-    getProcessor(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
+    getProcessorDirectly(new PartialPath(
+        newTsFileResource.getTsFile().getParentFile().getParentFile().getParentFile().getName()))

Review comment:
       May you can use `getSgByEngineFile`.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -592,8 +717,13 @@ public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryConte
    */
   public int countUpgradeFiles() {
     int totalUpgradeFileNum = 0;
-    for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
-      totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
+    for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+      for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager
+          .getAllVirutalStorageGroupProcessor()) {
+        if (storageGroupProcessor != null) {
+          totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
+        }
+      }

Review comment:
       Optional: move such loops into VirtualSGManager to reduce upper-level changes.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -319,35 +360,88 @@ public ServiceType getID() {
     return ServiceType.STORAGE_ENGINE_SERVICE;
   }
 
-  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+  /**
+   * This method is for sync, delete tsfile or sth like them, just get storage group directly by sg
+   * name
+   *
+   * @param path storage group path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessorDirectly(PartialPath path)
+      throws StorageEngineException {
     PartialPath storageGroupPath;
     try {
       StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
       storageGroupPath = storageGroupMNode.getPartialPath();
-      StorageGroupProcessor processor = processorMap.get(storageGroupPath);
-      if (processor == null) {
+      return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
+  /**
+   * This method is for insert and query or sth like them, this may get a virtual storage group
+   *
+   * @param path device path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+    try {
+      StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
+      return getStorageGroupProcessorByPath(path, storageGroupMNode);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
+  /**
+   * get storage group processor by path
+   * @param storageGroupPath path of the storage group
+   * @param storageGroupMNode mnode of the storage group
+   * @return found or new storage group processor
+   */
+  @SuppressWarnings("java:S2445")
+  // actually storageGroupMNode is a unique object on the mtree, synchronize it is reasonable
+  private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath storageGroupPath,
+      StorageGroupMNode storageGroupMNode)
+      throws StorageGroupProcessorException, StorageEngineException {
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap
+        .get(storageGroupMNode.getPartialPath());
+    if (virtualStorageGroupManager == null) {
+      // if finish recover
+      if (isAllSgReady.get()) {
         waitAllSgReady(storageGroupPath);
-        // if finish recover
-        if (isAllSgReady.get()) {
-          synchronized (storageGroupMNode) {
-            processor = processorMap.get(storageGroupPath);
-            if (processor == null) {
-              logger.info("construct a processor instance, the storage group is {}, Thread is {}",
-                  storageGroupPath, Thread.currentThread().getId());
-              processor = new StorageGroupProcessor(systemDir, storageGroupPath.getFullPath(),
-                  fileFlushPolicy);
-              processor.setDataTTL(storageGroupMNode.getDataTTL());
-              processor.setCustomFlushListeners(customFlushListeners);
-              processor.setCustomCloseFileListeners(customCloseFileListeners);
-              processorMap.put(storageGroupPath, processor);
-            }
+        synchronized (storageGroupMNode) {
+          virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+          if (virtualStorageGroupManager == null) {
+            virtualStorageGroupManager = new VirtualStorageGroupManager();
+            processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroupManager);
           }
         }
+      } else {
+        // not finished recover, refuse the request
+        throw new StorageEngineException(
+            "the sg " + storageGroupMNode.getPartialPath()
+                + " may not ready now, please wait and retry later",
+            TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
       }
-      return processor;
-    } catch (StorageGroupProcessorException | MetadataException e) {
-      throw new StorageEngineException(e);
     }
+    return virtualStorageGroupManager.getProcessor(storageGroupPath, storageGroupMNode);
+  }
+
+  public StorageGroupProcessor buildNewStorageGroupProcessor(PartialPath storageGroupPath,
+      StorageGroupMNode storageGroupMNode, String storageGroupName)
+      throws StorageGroupProcessorException {
+    StorageGroupProcessor processor;
+    logger.info("construct a processor instance, the storage group is {}, Thread is {}",
+        storageGroupPath, Thread.currentThread().getId());
+    processor = new StorageGroupProcessor(systemDir + File.separator + storageGroupPath,
+        storageGroupName,
+        fileFlushPolicy, storageGroupMNode.getFullPath());
+    processor.setDataTTL(storageGroupMNode.getDataTTL());
+    processor.setCustomFlushListeners(customFlushListeners);
+    processor.setCustomCloseFileListeners(customCloseFileListeners);
+    return processor;
   }

Review comment:
       ![image](https://user-images.githubusercontent.com/23610645/104537372-d1adc500-5654-11eb-964f-4ebad49a6d6c.png)
   In the constructor of `StorageGroupProcessor`, the last parameter is called `logicalStorageGroupName`, which does not seemly imply `storageGroupMNode.getFullPath()`.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -513,31 +613,37 @@ public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partit
       boolean isSeq,
       boolean isSync)
       throws StorageGroupNotSetException {
-    StorageGroupProcessor processor = processorMap.get(storageGroupPath);
-    if (processor == null) {
-      throw new StorageGroupNotSetException(storageGroupPath.getFullPath(), true);
-    }
-
-    logger.info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
-        storageGroupPath, isSeq, partitionId);
-    processor.writeLock();
-    // to avoid concurrent modification problem, we need a new array list
-    List<TsFileProcessor> processors = isSeq ?
-        new ArrayList<>(processor.getWorkSequenceTsFileProcessors()) :
-        new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
-    try {
-      for (TsFileProcessor tsfileProcessor : processors) {
-        if (tsfileProcessor.getTimeRangeId() == partitionId) {
-          if (isSync) {
-            processor.syncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
-          } else {
-            processor.asyncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
+    if (!processorMap.containsKey(storageGroupPath)) {
+      throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
+    }
+
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+    for (StorageGroupProcessor processor : virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
+      if (processor != null) {
+        logger
+            .info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
+                storageGroupPath, isSeq, partitionId);

Review comment:
       Also include the virtual group number.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public interface VirtualPartitioner {
+
+  /**
+   * use device id to determine storage group id
+   *
+   * @param deviceId device id
+   * @return virtual storage group id
+   */
+  public int deviceToStorageGroup(PartialPath deviceId);

Review comment:
       Maybe `deviceToPartitionId` or `deviceToVirtuaStoragelGroupNumber`  is more accurate.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -449,56 +542,63 @@ private void updateMonitorStatistics(StorageGroupProcessor processor, InsertPlan
    */
   public void syncCloseAllProcessor() {
     logger.info("Start closing all storage group processor");
-    for (StorageGroupProcessor processor : processorMap.values()) {
+    for (VirtualStorageGroupManager processor : processorMap.values()) {
       processor.syncCloseAllWorkingTsFileProcessors();
     }
   }
 
   public void forceCloseAllProcessor() throws TsFileProcessorException {
     logger.info("Start force closing all storage group processor");
-    for (StorageGroupProcessor processor : processorMap.values()) {
+    for (VirtualStorageGroupManager processor : processorMap.values()) {
       processor.forceCloseAllWorkingTsFileProcessors();
     }
   }
 
   public void closeStorageGroupProcessor(PartialPath storageGroupPath, boolean isSeq,
       boolean isSync) {
-    StorageGroupProcessor processor = processorMap.get(storageGroupPath);
-    if (processor == null) {
+    if (!processorMap.containsKey(storageGroupPath)) {
       return;
     }
 
-    if (logger.isInfoEnabled()) {
-      logger.info("{} closing sg processor is called for closing {}, seq = {}",
-          isSync ? "sync" : "async", storageGroupPath,
-          isSeq);
-    }
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+    for (StorageGroupProcessor processor : virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
+      if (processor == null) {
+        continue;
+      }
 
-    processor.writeLock();
-    try {
-      if (isSeq) {
-        // to avoid concurrent modification problem, we need a new array list
-        for (TsFileProcessor tsfileProcessor : new ArrayList<>(
-            processor.getWorkSequenceTsFileProcessors())) {
-          if (isSync) {
-            processor.syncCloseOneTsFileProcessor(true, tsfileProcessor);
-          } else {
-            processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
+      if (logger.isInfoEnabled()) {
+        logger.info("{} closing sg processor is called for closing {}, seq = {}",
+            isSync ? "sync" : "async", storageGroupPath,
+            isSeq);
+      }

Review comment:
       Maybe the virtual group number should be added or `storageGroupPath` should be replaced with the processor's path, otherwise, several identical logs will be printed.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -712,25 +875,31 @@ public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
    * @return sg name
    */
   private String getSgByEngineFile(File file) {
-    return file.getParentFile().getParentFile().getName();
+    return file.getParentFile().getParentFile().getParentFile().getName();
   }
 
   /**
    * @return TsFiles (seq or unseq) grouped by their storage group and partition number.
    */
   public Map<PartialPath, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
     Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
-    for (Entry<PartialPath, StorageGroupProcessor> entry : processorMap.entrySet()) {
-      List<TsFileResource> allResources = entry.getValue().getSequenceFileTreeSet();
-      allResources.addAll(entry.getValue().getUnSequenceFileList());
-      for (TsFileResource sequenceFile : allResources) {
-        if (!sequenceFile.isClosed()) {
-          continue;
+    for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
+      for (StorageGroupProcessor storageGroupProcessor : entry.getValue()
+          .getAllVirutalStorageGroupProcessor()) {
+        if (storageGroupProcessor != null) {
+          List<TsFileResource> allResources = storageGroupProcessor.getSequenceFileTreeSet();
+          allResources.addAll(storageGroupProcessor.getUnSequenceFileList());
+          for (TsFileResource sequenceFile : allResources) {

Review comment:
       The name `sequenceFile` is inaccurate as `allResources` contains both seq and unseq files. Maybe just use `tsFile`.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualStorageGroupManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroupManager.class);
+
+  /**
+   * virtual storage group partitioner
+   */
+  VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+
+  /**
+   * all virtual storage group processor
+   */
+  StorageGroupProcessor[] virtualStorageGroupProcessor;
+
+  /**
+   * get all virtual storage group Processor
+   * @return all virtual storage group Processor
+   */
+  public StorageGroupProcessor[] getAllVirutalStorageGroupProcessor(){
+    return virtualStorageGroupProcessor;
+  }
+
+  public VirtualStorageGroupManager(){
+    virtualStorageGroupProcessor = new StorageGroupProcessor[partitioner.getPartitionCount()];
+  }
+
+  /**
+   * push forceCloseAllWorkingTsFileProcessors down to all sg
+   */
+  public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
+    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
+      if(storageGroupProcessor != null){
+        storageGroupProcessor.forceCloseAllWorkingTsFileProcessors();
+      }
+    }
+  }
+
+  /**
+   * push syncCloseAllWorkingTsFileProcessors down to all sg
+   */
+  public void syncCloseAllWorkingTsFileProcessors(){
+    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
+      if(storageGroupProcessor != null){
+        storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+      }
+    }
+  }
+
+  /**
+   * push check ttl down to all sg
+   */
+  public void checkTTL(){
+    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
+      if(storageGroupProcessor != null){
+        storageGroupProcessor.checkFilesTTL();
+      }
+    }
+  }
+
+  /**
+   * get processor from device id
+   * @param partialPath device path
+   * @return virtual storage group processor
+   */
+  @SuppressWarnings("java:S2445")
+  // actually storageGroupMNode is a unique object on the mtree, synchronize it is reasonable
+  public StorageGroupProcessor getProcessor(PartialPath partialPath, StorageGroupMNode storageGroupMNode)
+      throws StorageGroupProcessorException, StorageEngineException {
+    int loc = partitioner.deviceToStorageGroup(partialPath);
+
+    StorageGroupProcessor processor = virtualStorageGroupProcessor[loc];
+    if (processor == null) {
+      // if finish recover
+      if (StorageEngine.getInstance().isAllSgReady()) {
+        synchronized (storageGroupMNode) {
+          processor = virtualStorageGroupProcessor[loc];
+          if (processor == null) {
+            processor = StorageEngine.getInstance()
+                .buildNewStorageGroupProcessor(storageGroupMNode.getPartialPath(), storageGroupMNode, String.valueOf(loc));
+            virtualStorageGroupProcessor[loc] = processor;
+          }
+        }
+      } else {
+        // not finished recover, refuse the request
+        throw new StorageEngineException(
+            "the sg " + partialPath + " may not ready now, please wait and retry later",
+            TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+      }
+    }
+
+    return processor;
+  }
+
+  /**
+   * recover
+   * @param storageGroupMNode logical sg mnode
+   */
+  public void recover(StorageGroupMNode storageGroupMNode) throws StorageGroupProcessorException {
+    for (int i = 0; i < partitioner.getPartitionCount(); i++) {
+      StorageGroupProcessor processor = StorageEngine.getInstance()
+          .buildNewStorageGroupProcessor(storageGroupMNode.getPartialPath(), storageGroupMNode, String.valueOf(i));
+      virtualStorageGroupProcessor[i] = processor;

Review comment:
       Maybe different partitions can be recovered in parallel.

##########
File path: server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
##########
@@ -174,7 +174,7 @@ private static void insertData() throws ClassNotFoundException {
         sqls.add(String.format("INSERT INTO root.test%d(timestamp, s0) VALUES (%d, %d)", j,
             i * partitionInterval, i * partitionInterval));
       }
-      sqls.add("MERGE");
+      // sqls.add("MERGE");

Review comment:
       Does this merge cause any trouble?

##########
File path: server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
##########
@@ -202,8 +209,8 @@ public void loadSequenceTsfileTest() throws SQLException {
           StorageEngine.getInstance().getProcessor(new PartialPath("root.vehicle"))
               .getSequenceFileTreeSet());
       File tmpDir = new File(
-          resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
-          "tmp" + File.separator + new PartialPath("root.vehicle") + File.separator + "0");
+          resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),

Review comment:
       Consider using `getSGByEngingFile` in `StorageEngine`.

##########
File path: server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
##########
@@ -102,9 +106,11 @@ public void loadNewTsfiles() throws IOException, StorageEngineException, Illegal
         correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
         String rand = String.valueOf(r.nextInt(10000));
         String fileName =
-            getSnapshotFolder() + File.separator + SG_NAME + i + File.separator
+            getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + "0" + File.separator + "0" + File.separator
                 + (time + i * 100 + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
                 + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
+        LOGGER.error("file name is" + fileName);

Review comment:
       Is not the level of this logs too high?

##########
File path: server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
##########
@@ -29,6 +29,7 @@
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;

Review comment:
       This import is added while there are no other changes to this file, please take a look.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public interface VirtualPartitioner {
+
+  /**
+   * use device id to determine storage group id
+   *
+   * @param deviceId device id
+   * @return virtual storage group id
+   */
+  public int deviceToStorageGroup(PartialPath deviceId);
+
+  /**
+   * release resource
+   */
+  public void clear();

Review comment:
       Is this interface necessary as there is no meaningful implementation of it? What resources does it intend to hold?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org