You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/12/09 04:52:19 UTC

[iotdb] branch virtual_partition_2 updated: refactor

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch virtual_partition_2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/virtual_partition_2 by this push:
     new 538bb08  refactor
538bb08 is described below

commit 538bb08ba67a775b428e30ba742f98c7c99d92ea
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 9 12:51:58 2020 +0800

    refactor
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 64 +++++++++++-----------
 ...eGroup.java => VirtualStorageGroupManager.java} | 47 +++++++++++-----
 2 files changed, 65 insertions(+), 46 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 861a584..a492962 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -55,7 +55,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
 import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualPartitioner;
-import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroup;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
 import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.ShutdownException;
@@ -117,7 +117,7 @@ public class StorageEngine implements IService {
   /**
    * storage group name -> storage group processor
    */
-  private final ConcurrentHashMap<PartialPath, VirtualStorageGroup> processorMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<PartialPath, VirtualStorageGroupManager> processorMap = new ConcurrentHashMap<>();
   private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
 
   private ExecutorService recoverAllSgThreadPool;
@@ -275,9 +275,9 @@ public class StorageEngine implements IService {
     for (StorageGroupMNode storageGroup : sgNodes) {
       futures.add(recoveryThreadPool.submit(() -> {
         try {
-          VirtualStorageGroup virtualStorageGroup = new VirtualStorageGroup();
-          virtualStorageGroup.recover(storageGroup);
-          processorMap.put(storageGroup.getPartialPath(), virtualStorageGroup);
+          VirtualStorageGroupManager virtualStorageGroupManager = new VirtualStorageGroupManager();
+          virtualStorageGroupManager.recover(storageGroup);
+          processorMap.put(storageGroup.getPartialPath(), virtualStorageGroupManager);
 
           logger.info("Storage Group Processor {} is recovered successfully",
               storageGroup.getFullPath());
@@ -300,7 +300,7 @@ public class StorageEngine implements IService {
 
   private void checkTTL() {
     try {
-      for (VirtualStorageGroup processor : processorMap.values()) {
+      for (VirtualStorageGroupManager processor : processorMap.values()) {
         processor.checkTTL();
       }
     } catch (ConcurrentModificationException e) {
@@ -406,15 +406,15 @@ public class StorageEngine implements IService {
   private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath storageGroupPath,
       StorageGroupMNode storageGroupMNode)
       throws StorageGroupProcessorException, StorageEngineException {
-    VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupMNode.getPartialPath());
-    if (virtualStorageGroup == null) {
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+    if (virtualStorageGroupManager == null) {
       // if finish recover
       if (isAllSgReady.get()) {
         synchronized (storageGroupMNode) {
-          virtualStorageGroup = processorMap.get(storageGroupMNode.getPartialPath());
-          if (virtualStorageGroup == null) {
-            virtualStorageGroup = new VirtualStorageGroup();
-            processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroup);
+          virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+          if (virtualStorageGroupManager == null) {
+            virtualStorageGroupManager = new VirtualStorageGroupManager();
+            processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroupManager);
           }
         }
       } else {
@@ -424,7 +424,7 @@ public class StorageEngine implements IService {
             TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
       }
     }
-    return virtualStorageGroup.getProcessor(storageGroupPath, storageGroupMNode);
+    return virtualStorageGroupManager.getProcessor(storageGroupPath, storageGroupMNode);
   }
 
   public StorageGroupProcessor buildNewStorageGroupProcessor(PartialPath storageGroupPath,
@@ -504,14 +504,14 @@ public class StorageEngine implements IService {
    */
   public void syncCloseAllProcessor() {
     logger.info("Start closing all storage group processor");
-    for (VirtualStorageGroup processor : processorMap.values()) {
+    for (VirtualStorageGroupManager processor : processorMap.values()) {
       processor.syncCloseAllWorkingTsFileProcessors();
     }
   }
 
   public void forceCloseAllProcessor() throws TsFileProcessorException {
     logger.info("Start force closing all storage group processor");
-    for (VirtualStorageGroup processor : processorMap.values()) {
+    for (VirtualStorageGroupManager processor : processorMap.values()) {
       processor.forceCloseAllWorkingTsFileProcessors();
     }
   }
@@ -522,8 +522,8 @@ public class StorageEngine implements IService {
       return;
     }
 
-    VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupPath);
-    for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+    for (StorageGroupProcessor processor : virtualStorageGroupManager.getAllPartition()) {
       if (processor == null) {
         continue;
       }
@@ -578,8 +578,8 @@ public class StorageEngine implements IService {
       return;
     }
 
-    VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupPath);
-    for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+    for (StorageGroupProcessor processor : virtualStorageGroupManager.getAllPartition()) {
       if (processor != null) {
         logger
             .info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
@@ -687,8 +687,8 @@ public class StorageEngine implements IService {
    */
   public int countUpgradeFiles() {
     int totalUpgradeFileNum = 0;
-    for (VirtualStorageGroup virtualStorageGroup : processorMap.values()) {
-      for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()) {
+    for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+      for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()) {
         if (storageGroupProcessor != null) {
           totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
         }
@@ -707,8 +707,8 @@ public class StorageEngine implements IService {
       throw new StorageEngineException(
           "Current system mode is read only, does not support file upgrade");
     }
-    for (VirtualStorageGroup virtualStorageGroup : processorMap.values()) {
-      for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()) {
+    for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+      for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()) {
         if (storageGroupProcessor != null) {
           storageGroupProcessor.upgrade();
         }
@@ -726,8 +726,8 @@ public class StorageEngine implements IService {
       throw new StorageEngineException("Current system mode is read only, does not support merge");
     }
 
-    for (VirtualStorageGroup virtualStorageGroup : processorMap.values()) {
-      for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()) {
+    for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+      for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()) {
         if (storageGroupProcessor != null) {
           storageGroupProcessor.merge(fullMerge);
         }
@@ -787,8 +787,8 @@ public class StorageEngine implements IService {
     }
 
     deleteAllDataFilesInOneStorageGroup(storageGroupPath);
-    VirtualStorageGroup virtualStorageGroup = processorMap.remove(storageGroupPath);
-    for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.remove(storageGroupPath);
+    for (StorageGroupProcessor processor : virtualStorageGroupManager.getAllPartition()) {
       if (processor != null) {
         processor.deleteFolder(systemDir + File.pathSeparator + storageGroupPath);
       }
@@ -847,7 +847,7 @@ public class StorageEngine implements IService {
    */
   public Map<PartialPath, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
     Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
-    for (Entry<PartialPath, VirtualStorageGroup> entry : processorMap.entrySet()) {
+    for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
       for(StorageGroupProcessor storageGroupProcessor : entry.getValue().getAllPartition()){
         if(storageGroupProcessor != null){
           List<TsFileResource> allResources = storageGroupProcessor.getSequenceFileTreeSet();
@@ -873,8 +873,8 @@ public class StorageEngine implements IService {
 
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath storageGroup,
       long partitionNum) {
-    VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroup);
-    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()){
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroup);
+    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()){
       if(storageGroupProcessor != null && storageGroupProcessor.isFileAlreadyExist(tsFileResource, partitionNum)){
         return true;
       }
@@ -908,7 +908,7 @@ public class StorageEngine implements IService {
     }
   }
 
-  public Map<PartialPath, VirtualStorageGroup> getProcessorMap() {
+  public Map<PartialPath, VirtualStorageGroupManager> getProcessorMap() {
     return processorMap;
   }
 
@@ -920,7 +920,7 @@ public class StorageEngine implements IService {
    */
   public Map<String, List<Pair<Long, Boolean>>> getWorkingStorageGroupPartitions() {
     Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
-    for (Entry<PartialPath, VirtualStorageGroup> entry : processorMap.entrySet()) {
+    for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
       for(StorageGroupProcessor storageGroupProcessor : entry.getValue().getAllPartition()) {
         if (storageGroupProcessor != null) {
           List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 85a60c4..46f75c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -11,41 +11,56 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class VirtualStorageGroup {
+public class VirtualStorageGroupManager {
 
-  private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroup.class);
+  private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroupManager.class);
 
+  /**
+   * virtual storage group partitioner
+   */
   VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
 
-  StorageGroupProcessor[] virtualPartition;
+  /**
+   * all virtual storage group processor
+   */
+  StorageGroupProcessor[] virtualStorageGroupProcessor;
 
 
   public StorageGroupProcessor[] getAllPartition(){
-    return virtualPartition;
+    return virtualStorageGroupProcessor;
   }
 
-  public VirtualStorageGroup(){
-    virtualPartition = new StorageGroupProcessor[partitioner.getPartitionCount()];
+  public VirtualStorageGroupManager(){
+    virtualStorageGroupProcessor = new StorageGroupProcessor[partitioner.getPartitionCount()];
   }
 
+  /**
+   * push forceCloseAllWorkingTsFileProcessors down to all sg
+   */
   public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
-    for(StorageGroupProcessor storageGroupProcessor : virtualPartition){
+    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
       if(storageGroupProcessor != null){
         storageGroupProcessor.forceCloseAllWorkingTsFileProcessors();
       }
     }
   }
 
+  /**
+   * push syncCloseAllWorkingTsFileProcessors down to all sg
+   */
   public void syncCloseAllWorkingTsFileProcessors(){
-    for(StorageGroupProcessor storageGroupProcessor : virtualPartition){
+    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
       if(storageGroupProcessor != null){
         storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
       }
     }
   }
 
+  /**
+   * push check ttl down to all sg
+   */
   public void checkTTL(){
-    for(StorageGroupProcessor storageGroupProcessor : virtualPartition){
+    for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
       if(storageGroupProcessor != null){
         storageGroupProcessor.checkFilesTTL();
       }
@@ -53,7 +68,7 @@ public class VirtualStorageGroup {
   }
 
   /**
-   *
+   * get processor from device id
    * @param partialPath device path
    * @return virtual storage group processor
    */
@@ -61,16 +76,16 @@ public class VirtualStorageGroup {
       throws StorageGroupProcessorException, StorageEngineException {
     int loc = partitioner.deviceToStorageGroup(partialPath);
 
-    StorageGroupProcessor processor = virtualPartition[loc];
+    StorageGroupProcessor processor = virtualStorageGroupProcessor[loc];
     if (processor == null) {
       // if finish recover
       if (StorageEngine.getInstance().isAllSgReady()) {
         synchronized (storageGroupMNode) {
-          processor = virtualPartition[loc];
+          processor = virtualStorageGroupProcessor[loc];
           if (processor == null) {
             processor = StorageEngine.getInstance()
                 .buildNewStorageGroupProcessor(partialPath, storageGroupMNode, String.valueOf(loc));
-            virtualPartition[loc] = processor;
+            virtualStorageGroupProcessor[loc] = processor;
           }
         }
       } else {
@@ -84,11 +99,15 @@ public class VirtualStorageGroup {
     return processor;
   }
 
+  /**
+   * recover
+   * @param storageGroupMNode logical sg mnode
+   */
   public void recover(StorageGroupMNode storageGroupMNode) throws StorageGroupProcessorException {
     for (int i = 0; i < partitioner.getPartitionCount(); i++) {
       StorageGroupProcessor processor = StorageEngine.getInstance()
           .buildNewStorageGroupProcessor(storageGroupMNode.getPartialPath(), storageGroupMNode, String.valueOf(i));
-      virtualPartition[i] = processor;
+      virtualStorageGroupProcessor[i] = processor;
     }
   }
 }