You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/11/30 11:57:19 UTC

[iotdb] branch virtual_partition created (now 983bc70)

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a change to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 983bc70  explore partition

This branch includes the following new commits:

     new 79a985d  initial explore
     new 983bc70  explore partition

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/02: initial explore

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79a985d184cc2953b6fa5729621d7bce9e9f4e0a
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Nov 30 18:46:09 2020 +0800

    initial explore
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 378 ++++++++++++---------
 .../iotdb/db/engine/merge/task/MergeTask.java      |  42 ++-
 .../storagegroup/HashVirtualPartitioner.java       |  80 +++++
 .../engine/storagegroup/StorageGroupProcessor.java |  13 +-
 .../db/engine/storagegroup/VirtualPartitioner.java |  32 ++
 .../apache/iotdb/db/engine/merge/MergeLogTest.java |   5 +
 .../iotdb/db/engine/merge/MergeOverLapTest.java    |   5 +
 .../iotdb/db/engine/merge/MergeTaskTest.java       |   6 +-
 .../engine/modification/DeletionFileNodeTest.java  |   4 +
 .../storagegroup/StorageGroupProcessorTest.java    |   5 +
 .../iotdb/db/integration/IoTDBAggregationIT.java   |   4 -
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |   4 +
 .../db/integration/IoTDBRemovePartitionIT.java     |   5 +
 .../iotdb/db/integration/IoTDBRestartIT.java       |   6 +
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   2 +
 16 files changed, 424 insertions(+), 180 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a1a922b..5f94d9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -782,6 +782,11 @@ public class IoTDBConfig {
    */
   private boolean debugState = false;
 
+  /**
+   * whether we enable virtual partition
+   */
+  private boolean enableVirtualPartition = true;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -2093,4 +2098,12 @@ public class IoTDBConfig {
   public void setDefaultIndexWindowRange(int defaultIndexWindowRange) {
     this.defaultIndexWindowRange = defaultIndexWindowRange;
   }
+
+  public boolean isEnableVirtualPartition() {
+    return enableVirtualPartition;
+  }
+
+  public void setEnableVirtualPartition(boolean enableVirtualPartition) {
+    this.enableVirtualPartition = enableVirtualPartition;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index ddb0895..2a93fab 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -49,10 +49,12 @@ import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.HashVirtualPartitioner;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualPartitioner;
 import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.ShutdownException;
@@ -67,7 +69,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
-import org.apache.iotdb.db.monitor.MonitorConstants;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -89,70 +90,42 @@ import org.slf4j.LoggerFactory;
 
 public class StorageEngine implements IService {
 
-  private final Logger logger;
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
-
+  private static final VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+  // avoid final for test
+  private static ExecutorService recoveryThreadPool;
+  /**
+   * Time range for dividing storage group, the time unit is the same with IoTDB's
+   * TimestampPrecision
+   */
+  @ServerConfigConsistent
+  private static long timePartitionInterval = -1;
+  /**
+   * whether enable data partition if disabled, all data belongs to partition 0
+   */
+  @ServerConfigConsistent
+  private static boolean enablePartition =
+      IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+  private final Logger logger;
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
    * will have a subfolder under the systemDir.
    */
   private final String systemDir;
-
   /**
    * storage group name -> storage group processor
    */
   private final ConcurrentHashMap<PartialPath, StorageGroupProcessor> processorMap = new ConcurrentHashMap<>();
-
-  private static final ExecutorService recoveryThreadPool = IoTDBThreadPoolFactory
-      .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
-
-  public boolean isAllSgReady() {
-    return isAllSgReady.get();
-  }
-
-  public void setAllSgReady(boolean allSgReady) {
-    isAllSgReady.set(allSgReady);
-  }
-
   private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
 
   private ExecutorService recoverAllSgThreadPool;
-
-  static class InstanceHolder {
-
-    private InstanceHolder() {
-      // forbidding instantiation
-    }
-
-    private static final StorageEngine INSTANCE = new StorageEngine();
-  }
-
-  public static StorageEngine getInstance() {
-    return InstanceHolder.INSTANCE;
-  }
-
   private ScheduledExecutorService ttlCheckThread;
   private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
-
   // add customized listeners here for flush and close events
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
 
-  /**
-   * Time range for dividing storage group, the time unit is the same with IoTDB's
-   * TimestampPrecision
-   */
-  @ServerConfigConsistent
-  private static long timePartitionInterval = -1;
-
-  /**
-   * whether enable data partition if disabled, all data belongs to partition 0
-   */
-  @ServerConfigConsistent
-  private static boolean enablePartition =
-      IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
-
   private StorageEngine() {
     logger = LoggerFactory.getLogger(StorageEngine.class);
     systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
@@ -176,7 +149,69 @@ public class StorageEngine implements IService {
     recover();
   }
 
+  public static StorageEngine getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  private static void initTimePartition() {
+    timePartitionInterval = convertMilliWithPrecision(IoTDBDescriptor.getInstance().
+        getConfig().getPartitionInterval() * 1000L);
+  }
+
+  public static long convertMilliWithPrecision(long milliTime) {
+    long result = milliTime;
+    String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
+    switch (timePrecision) {
+      case "ns":
+        result = milliTime * 1000_000L;
+        break;
+      case "us":
+        result = milliTime * 1000L;
+        break;
+      default:
+        break;
+    }
+    return result;
+  }
+
+  public static long getTimePartitionInterval() {
+    if (timePartitionInterval == -1) {
+      initTimePartition();
+    }
+    return timePartitionInterval;
+  }
+
+  @TestOnly
+  public static void setTimePartitionInterval(long timePartitionInterval) {
+    StorageEngine.timePartitionInterval = timePartitionInterval;
+  }
+
+  public static long getTimePartition(long time) {
+    return enablePartition ? time / timePartitionInterval : 0;
+  }
+
+  @TestOnly
+  public static boolean isEnablePartition() {
+    return enablePartition;
+  }
+
+  @TestOnly
+  public static void setEnablePartition(boolean enablePartition) {
+    StorageEngine.enablePartition = enablePartition;
+  }
+
+  public boolean isAllSgReady() {
+    return isAllSgReady.get();
+  }
+
+  public void setAllSgReady(boolean allSgReady) {
+    isAllSgReady.set(allSgReady);
+  }
+
   public void recover() {
+    setAllSgReady(false);
+    recoveryThreadPool = IoTDBThreadPoolFactory
+        .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
     recoverAllSgThreadPool = IoTDBThreadPoolFactory
         .newSingleThreadExecutor("Begin-Recovery-Pool");
     recoverAllSgThreadPool.submit(this::recoverAllSgs);
@@ -186,25 +221,53 @@ public class StorageEngine implements IService {
     /*
      * recover all storage group processors.
      */
-    List<StorageGroupMNode> sgNodes = IoTDB.metaManager.getAllStorageGroupNodes();
     List<Future<Void>> futures = new ArrayList<>();
-    for (StorageGroupMNode storageGroup : sgNodes) {
-      futures.add(recoveryThreadPool.submit(() -> {
-        try {
-          StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
-              storageGroup.getFullPath(), fileFlushPolicy);
-          processor.setDataTTL(storageGroup.getDataTTL());
-          processor.setCustomCloseFileListeners(customCloseFileListeners);
-          processor.setCustomFlushListeners(customFlushListeners);
-          processorMap.put(storageGroup.getPartialPath(), processor);
-          logger.info("Storage Group Processor {} is recovered successfully",
-              storageGroup.getFullPath());
-        } catch (Exception e) {
-          logger
-              .error("meet error when recovering storage group: {}", storageGroup.getFullPath(), e);
-        }
-        return null;
-      }));
+    if(!IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+      List<StorageGroupMNode> sgNodes = IoTDB.metaManager.getAllStorageGroupNodes();
+      for (StorageGroupMNode storageGroup : sgNodes) {
+        futures.add(recoveryThreadPool.submit(() -> {
+          try {
+            StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
+                storageGroup.getFullPath(), fileFlushPolicy);
+            processor.setDataTTL(storageGroup.getDataTTL());
+            processor.setCustomCloseFileListeners(customCloseFileListeners);
+            processor.setCustomFlushListeners(customFlushListeners);
+            processorMap.put(storageGroup.getPartialPath(), processor);
+            logger.info("Storage Group Processor {} is recovered successfully",
+                storageGroup.getFullPath());
+          } catch (Exception e) {
+            logger
+                .error("meet error when recovering storage group: {}", storageGroup.getFullPath(),
+                    e);
+          }
+          return null;
+        }));
+      }
+    }
+    else{
+      List<String> sgNames = new ArrayList<>();
+      for (int i = 0; i < partitioner.getPartitionCount(); i++) {
+        sgNames.add(String.valueOf(i));
+      }
+
+      for (String sgName : sgNames) {
+        futures.add(recoveryThreadPool.submit(() -> {
+          try {
+            StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
+                sgName, fileFlushPolicy);
+            processor.setCustomCloseFileListeners(customCloseFileListeners);
+            processor.setCustomFlushListeners(customFlushListeners);
+            processorMap.put(new PartialPath(sgName), processor);
+            logger.info("Storage Group Processor {} is recovered successfully",
+                sgName);
+          } catch (Exception e) {
+            logger
+                .error("meet error when recovering storage group: {}", sgName,
+                    e);
+          }
+          return null;
+        }));
+      }
     }
     for (Future<Void> future : futures) {
       try {
@@ -220,27 +283,6 @@ public class StorageEngine implements IService {
     setAllSgReady(true);
   }
 
-  private static void initTimePartition() {
-    timePartitionInterval = convertMilliWithPrecision(IoTDBDescriptor.getInstance().
-        getConfig().getPartitionInterval() * 1000L);
-  }
-
-  public static long convertMilliWithPrecision(long milliTime) {
-    long result = milliTime;
-    String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
-    switch (timePrecision) {
-      case "ns":
-        result = milliTime * 1000_000L;
-        break;
-      case "us":
-        result = milliTime * 1000L;
-        break;
-      default:
-        break;
-    }
-    return result;
-  }
-
   @Override
   public void start() {
     ttlCheckThread = Executors.newSingleThreadScheduledExecutor();
@@ -318,41 +360,74 @@ public class StorageEngine implements IService {
     return ServiceType.STORAGE_ENGINE_SERVICE;
   }
 
-  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+  /**
+   * This method is for sync, delete tsfile or sth like them, just get storage group directly by sg
+   * name
+   * @param path storage group path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessorDirectly(PartialPath path)
+      throws StorageEngineException {
     PartialPath storageGroupPath;
     try {
       StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
       storageGroupPath = storageGroupMNode.getPartialPath();
-      StorageGroupProcessor processor = processorMap.get(storageGroupPath);
-      if (processor == null) {
-        // if finish recover
-        if (isAllSgReady.get()) {
-          synchronized (storageGroupMNode) {
-            processor = processorMap.get(storageGroupPath);
-            if (processor == null) {
-              logger.info("construct a processor instance, the storage group is {}, Thread is {}",
-                  storageGroupPath, Thread.currentThread().getId());
-              processor = new StorageGroupProcessor(systemDir, storageGroupPath.getFullPath(),
-                  fileFlushPolicy);
-              processor.setDataTTL(storageGroupMNode.getDataTTL());
-              processor.setCustomFlushListeners(customFlushListeners);
-              processor.setCustomCloseFileListeners(customCloseFileListeners);
-              processorMap.put(storageGroupPath, processor);
-            }
-          }
-        } else {
-          // not finished recover, refuse the request
-          throw new StorageEngineException(
-              "the sg " + storageGroupPath + " may not ready now, please wait and retry later",
-              TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
-        }
+      return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
+  /**
+   * This method is for insert and query or sth like them, this may get a virtual storage group
+   * @param path device path
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+    PartialPath storageGroupPath;
+    try {
+      StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+        storageGroupPath = partitioner.deviceToStorageGroup(path);
+      } else {
+        storageGroupPath = storageGroupMNode.getPartialPath();
       }
-      return processor;
+      return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
     } catch (StorageGroupProcessorException | MetadataException e) {
       throw new StorageEngineException(e);
     }
   }
 
+  // get storage group processor by partial path
+  private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath storageGroupPath,
+      StorageGroupMNode storageGroupMNode)
+      throws StorageGroupProcessorException, StorageEngineException {
+    StorageGroupProcessor processor = processorMap.get(storageGroupPath);
+    if (processor == null) {
+      // if finish recover
+      if (isAllSgReady.get()) {
+        synchronized (storageGroupMNode) {
+          processor = processorMap.get(storageGroupPath);
+          if (processor == null) {
+            logger.info("construct a processor instance, the storage group is {}, Thread is {}",
+                storageGroupPath, Thread.currentThread().getId());
+            processor = new StorageGroupProcessor(systemDir, storageGroupPath.getFullPath(),
+                fileFlushPolicy);
+            processor.setDataTTL(storageGroupMNode.getDataTTL());
+            processor.setCustomFlushListeners(customFlushListeners);
+            processor.setCustomCloseFileListeners(customCloseFileListeners);
+            processorMap.put(storageGroupPath, processor);
+          }
+        }
+      } else {
+        // not finished recover, refuse the request
+        throw new StorageEngineException(
+            "the sg " + storageGroupPath + " may not ready now, please wait and retry later",
+            TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+      }
+    }
+    return processor;
+  }
 
   /**
    * This function is just for unit test.
@@ -361,7 +436,6 @@ public class StorageEngine implements IService {
     processorMap.clear();
   }
 
-
   /**
    * insert an InsertRowPlan to a storage group.
    *
@@ -520,11 +594,18 @@ public class StorageEngine implements IService {
   public void delete(PartialPath path, long startTime, long endTime, long planIndex)
       throws StorageEngineException {
     try {
-      List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
-      for (PartialPath storageGroupPath : sgPaths) {
-        StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
-        PartialPath newPath = path.alterPrefixPath(storageGroupPath);
-        storageGroupProcessor.delete(newPath, startTime, endTime, planIndex);
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+        // Distribute the request to all sg, this can be improved in future
+        for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
+          storageGroupProcessor.delete(path, startTime, endTime, planIndex);
+        }
+      } else {
+        List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
+        for (PartialPath storageGroupPath : sgPaths) {
+          StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
+          PartialPath newPath = path.alterPrefixPath(storageGroupPath);
+          storageGroupProcessor.delete(newPath, startTime, endTime, planIndex);
+        }
       }
     } catch (IOException | MetadataException e) {
       throw new StorageEngineException(e.getMessage());
@@ -537,10 +618,17 @@ public class StorageEngine implements IService {
   public void deleteTimeseries(PartialPath path, long planIndex)
       throws StorageEngineException {
     try {
-      for (PartialPath storageGroupPath : IoTDB.metaManager.searchAllRelatedStorageGroups(path)) {
-        StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
-        PartialPath newPath = path.alterPrefixPath(storageGroupPath);
-        storageGroupProcessor.delete(newPath, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+        // Distribute the request to all sg, this can be improved in future
+        for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
+          storageGroupProcessor.delete(path, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
+        }
+      } else {
+        for (PartialPath storageGroupPath : IoTDB.metaManager.searchAllRelatedStorageGroups(path)) {
+          StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
+          PartialPath newPath = path.alterPrefixPath(storageGroupPath);
+          storageGroupProcessor.delete(newPath, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
+        }
       }
     } catch (IOException | MetadataException e) {
       throw new StorageEngineException(e.getMessage());
@@ -646,7 +734,7 @@ public class StorageEngine implements IService {
 
   public void loadNewTsFileForSync(TsFileResource newTsFileResource)
       throws StorageEngineException, LoadFileException, IllegalPathException {
-    getProcessor(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
+    getProcessorDirectly(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
         .loadNewTsFileForSync(newTsFileResource);
   }
 
@@ -659,24 +747,24 @@ public class StorageEngine implements IService {
     String device = deviceMap.keySet().iterator().next();
     PartialPath devicePath = new PartialPath(device);
     PartialPath storageGroupPath = IoTDB.metaManager.getStorageGroupPath(devicePath);
-    getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource);
+    getProcessorDirectly(storageGroupPath).loadNewTsFile(newTsFileResource);
   }
 
   public boolean deleteTsfileForSync(File deletedTsfile)
       throws StorageEngineException, IllegalPathException {
-    return getProcessor(new PartialPath(deletedTsfile.getParentFile().getName()))
+    return getProcessorDirectly(new PartialPath(deletedTsfile.getParentFile().getName()))
         .deleteTsfile(deletedTsfile);
   }
 
   public boolean deleteTsfile(File deletedTsfile)
       throws StorageEngineException, IllegalPathException {
-    return getProcessor(new PartialPath(getSgByEngineFile(deletedTsfile)))
+    return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile)))
         .deleteTsfile(deletedTsfile);
   }
 
   public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
       throws StorageEngineException, IllegalPathException {
-    return getProcessor(new PartialPath(getSgByEngineFile(tsfileToBeMoved)))
+    return getProcessorDirectly(new PartialPath(getSgByEngineFile(tsfileToBeMoved)))
         .moveTsfile(tsfileToBeMoved, targetDir);
   }
 
@@ -722,22 +810,6 @@ public class StorageEngine implements IService {
     return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum);
   }
 
-  public static long getTimePartitionInterval() {
-    if (timePartitionInterval == -1) {
-      initTimePartition();
-    }
-    return timePartitionInterval;
-  }
-
-  @TestOnly
-  public static void setTimePartitionInterval(long timePartitionInterval) {
-    StorageEngine.timePartitionInterval = timePartitionInterval;
-  }
-
-  public static long getTimePartition(long time) {
-    return enablePartition ? time / timePartitionInterval : 0;
-  }
-
   /**
    * Set the version of given partition to newMaxVersion if it is larger than the current version.
    *
@@ -748,13 +820,12 @@ public class StorageEngine implements IService {
   public void setPartitionVersionToMax(PartialPath storageGroup, long partitionId,
       long newMaxVersion)
       throws StorageEngineException {
-    getProcessor(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
+    getProcessorDirectly(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
   }
 
-
   public void removePartitions(PartialPath storageGroupPath, TimePartitionFilter filter)
       throws StorageEngineException {
-    getProcessor(storageGroupPath).removePartitions(filter);
+    getProcessorDirectly(storageGroupPath).removePartitions(filter);
   }
 
   public Map<PartialPath, StorageGroupProcessor> getProcessorMap() {
@@ -787,16 +858,6 @@ public class StorageEngine implements IService {
     return res;
   }
 
-  @TestOnly
-  public static void setEnablePartition(boolean enablePartition) {
-    StorageEngine.enablePartition = enablePartition;
-  }
-
-  @TestOnly
-  public static boolean isEnablePartition() {
-    return enablePartition;
-  }
-
   /**
    * Add a listener to listen flush start/end events. Notice that this addition only applies to
    * TsFileProcessors created afterwards.
@@ -824,7 +885,7 @@ public class StorageEngine implements IService {
       throws StorageEngineException {
     Set<StorageGroupProcessor> set = new HashSet<>();
     for (PartialPath path : pathList) {
-      set.add(getProcessor(path));
+      set.add(getProcessor(path.getDevicePath()));
     }
     List<StorageGroupProcessor> list = set.stream()
         .sorted(Comparator.comparing(StorageGroupProcessor::getStorageGroupName))
@@ -839,4 +900,13 @@ public class StorageEngine implements IService {
   public void mergeUnLock(List<StorageGroupProcessor> list) {
     list.forEach(storageGroupProcessor -> storageGroupProcessor.getTsFileManagement().readUnLock());
   }
+
+  static class InstanceHolder {
+
+    private static final StorageEngine INSTANCE = new StorageEngine();
+
+    private InstanceHolder() {
+      // forbidding instantiation
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 8490410..f4d0a33 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -29,10 +29,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.manage.MergeContext;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.HashVirtualPartitioner;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualPartitioner;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -45,31 +48,28 @@ import org.slf4j.LoggerFactory;
 
 /**
  * MergeTask merges given seqFiles and unseqFiles into new ones, which basically consists of three
- * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files
- *        2. move the merged chunks in the temp files back to the seqFiles or move the unmerged
- *        chunks in the seqFiles into temp files and replace the seqFiles with the temp files.
- *        3. remove unseqFiles
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files 2. move the
+ * merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles
+ * into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles
  */
 public class MergeTask implements Callable<Void> {
 
   public static final String MERGE_SUFFIX = ".merge";
   private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
+  private static final VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
 
   MergeResource resource;
   String storageGroupSysDir;
   String storageGroupName;
   MergeLogger mergeLogger;
   MergeContext mergeContext = new MergeContext();
-
-  private MergeCallback callback;
   int concurrentMergeSeriesNum;
   String taskName;
   boolean fullMerge;
-
   States states = States.START;
-
   MergeMultiChunkTask chunkTask;
   MergeFileTask fileTask;
+  private MergeCallback callback;
 
   MergeTask(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, String storageGroupSysDir, MergeCallback callback,
@@ -126,7 +126,15 @@ public class MergeTask implements Callable<Void> {
 
     mergeLogger.logFiles(resource);
 
-    Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
+    Set<PartialPath> devices;
+
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+      devices = partitioner.storageGroupToDevice(new PartialPath(storageGroupName));
+    } else {
+      devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
+    }
+    System.out.println(devices);
+
     Map<PartialPath, MeasurementSchema> measurementSchemaMap = new HashMap<>();
     List<PartialPath> unmergedSeries = new ArrayList<>();
     for (PartialPath device : devices) {
@@ -212,14 +220,6 @@ public class MergeTask implements Callable<Void> {
     return storageGroupName;
   }
 
-  enum States {
-    START,
-    MERGE_CHUNKS,
-    MERGE_FILES,
-    CLEAN_UP,
-    ABORTED
-  }
-
   public String getProgress() {
     switch (states) {
       case ABORTED:
@@ -239,4 +239,12 @@ public class MergeTask implements Callable<Void> {
   public String getTaskName() {
     return taskName;
   }
+
+  enum States {
+    START,
+    MERGE_CHUNKS,
+    MERGE_FILES,
+    CLEAN_UP,
+    ABORTED
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
new file mode 100644
index 0000000..886ecb6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public class HashVirtualPartitioner implements VirtualPartitioner {
+
+  public static final int STORGARE_GROUP_NUM = 2;
+  HashMap<Integer, Set<PartialPath>> sgToDevice;
+
+  private HashVirtualPartitioner() {
+    sgToDevice = new HashMap<>();
+  }
+
+  public static HashVirtualPartitioner getInstance() {
+    return HashVirtualPartitionerHolder.INSTANCE;
+  }
+
+  private int toPartitionId(PartialPath deviceId){
+    return deviceId.hashCode() % STORGARE_GROUP_NUM;
+  }
+
+  @Override
+  public PartialPath deviceToStorageGroup(PartialPath deviceId) {
+    int partitionId = toPartitionId(deviceId);
+    sgToDevice.computeIfAbsent(partitionId, id -> new HashSet<>()).add(deviceId);
+    try {
+      return new PartialPath("" + partitionId);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+    }
+
+    return null;
+  }
+
+  @Override
+  public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup) {
+    return sgToDevice.get(Integer.parseInt(storageGroup.getFullPath()));
+  }
+
+  @Override
+  public void clear(){
+    sgToDevice.clear();
+  }
+
+  @Override
+  public int getPartitionCount() {
+    return STORGARE_GROUP_NUM;
+  }
+
+  private static class HashVirtualPartitionerHolder {
+
+    private static final HashVirtualPartitioner INSTANCE = new HashVirtualPartitioner();
+
+    private HashVirtualPartitionerHolder() {
+      // allowed to do nothing
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 69aec66..ade474e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1514,11 +1514,16 @@ public class StorageGroupProcessor {
             lastUpdateTime = curTime;
           }
         }
-        // There is no tsfile data, the delete operation is invalid
-        if (lastUpdateTime == null) {
-          logger.debug("No device {} in SG {}, deletion invalid", device, storageGroupName);
-          return;
+
+        // virtual partition will push deletion to all sg
+        if(!IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()){
+          // There is no tsfile data, the delete operation is invalid
+          if (lastUpdateTime == null) {
+            logger.debug("No device {} in SG {}, deletion invalid", device, storageGroupName);
+            return;
+          }
         }
+
         // delete Last cache record if necessary
         tryToDeleteLastCache(device, path, startTime, endTime);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
new file mode 100644
index 0000000..09bd9e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup;
+
+import java.util.Set;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public interface VirtualPartitioner {
+    public PartialPath deviceToStorageGroup(PartialPath deviceId);
+
+    public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
+
+    public void clear();
+
+    public int getPartitionCount();
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index 56d7eae..86baa7e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -43,18 +43,23 @@ import org.junit.Test;
 public class MergeLogTest extends MergeTest {
 
   File tempSGDir;
+  private static boolean enableVirtualPartition = false;
+
 
   @Before
   public void setUp() throws IOException, WriteProcessException, MetadataException {
     super.setUp();
     tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
     tempSGDir.mkdirs();
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+    enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
     super.tearDown();
     FileUtils.deleteDirectory(tempSGDir);
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index d841fd0..894784f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.task.MergeTask;
@@ -54,6 +55,7 @@ import org.junit.Test;
 public class MergeOverLapTest extends MergeTest {
 
   private File tempSGDir;
+  private static boolean enableVirtualPartition = false;
 
   @Before
   public void setUp()
@@ -62,12 +64,15 @@ public class MergeOverLapTest extends MergeTest {
     super.setUp();
     tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
     tempSGDir.mkdirs();
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+    enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
     super.tearDown();
     FileUtils.deleteDirectory(tempSGDir);
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index fb73a27..6611fa3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -48,18 +48,22 @@ import org.junit.Test;
 public class MergeTaskTest extends MergeTest {
 
   private File tempSGDir;
+  private static boolean enableVirtualPartition = false;
+
 
   @Before
   public void setUp() throws IOException, WriteProcessException, MetadataException, MetadataException {
     super.setUp();
     tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
-    tempSGDir.mkdirs();
+    tempSGDir.mkdirs();IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+    enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
     super.tearDown();
     FileUtils.deleteDirectory(tempSGDir);
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 98c104d..69ddcf5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -66,6 +66,7 @@ public class DeletionFileNodeTest {
   private static String[] measurements = new String[10];
   private TSDataType dataType = TSDataType.DOUBLE;
   private TSEncoding encoding = TSEncoding.PLAIN;
+  private static boolean enableVirtualPartition = false;
 
   private int prevUnseqLevelNum = 0;
 
@@ -89,12 +90,15 @@ public class DeletionFileNodeTest {
           encoding, TSFileDescriptor.getInstance().getConfig().getCompressor(),
           Collections.emptyMap());
     }
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+    enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
   }
 
   @After
   public void teardown() throws IOException, StorageEngineException {
     EnvironmentUtils.cleanEnv();
     IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
   }
 
   private void insertToStorageEngine(TSRecord record)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6e0efbc..f115f27 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -62,6 +62,8 @@ public class StorageGroupProcessorTest {
   private String measurementId = "s0";
   private StorageGroupProcessor processor;
   private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
+  private static boolean enableVirtualPartition = false;
+
 
   @Before
   public void setUp() throws Exception {
@@ -71,6 +73,8 @@ public class StorageGroupProcessorTest {
     EnvironmentUtils.envSetUp();
     processor = new DummySGP(systemDir, storageGroup);
     MergeManager.getINSTANCE().start();
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+    enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
   }
 
   @After
@@ -82,6 +86,7 @@ public class StorageGroupProcessorTest {
     EnvironmentUtils.cleanEnv();
     IoTDBDescriptor.getInstance().getConfig()
         .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
   }
 
   private void insertToStorageGroupProcessor(TSRecord record)
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index bdeb6a4..607a1a8 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -50,9 +50,6 @@ public class IoTDBAggregationIT {
   private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature";
 
   private static String[] creationSqls = new String[]{
-      "SET STORAGE GROUP TO root.vehicle.d0",
-      "SET STORAGE GROUP TO root.vehicle.d1",
-
       "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
       "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
       "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
@@ -60,7 +57,6 @@ public class IoTDBAggregationIT {
       "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN"
   };
   private static String[] dataSet2 = new String[]{
-      "SET STORAGE GROUP TO root.ln.wf01.wt01",
       "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
       "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=PLAIN",
       "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 319fdd1..3e5a075 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -117,6 +117,7 @@ public class IoTDBLoadExternalTsfileIT {
   private static final String TEST_D0_S0_STR = "root.test.d0.s0";
   private static final String TEST_D0_S1_STR = "root.test.d0.s1";
   private static final String TEST_D1_STR = "root.test.d1.g0.s0";
+  private static boolean enableVirtualPartition = false;
 
   private static String[] deleteSqls = new String[]{
       "DELETE STORAGE GROUP root.vehicle",
@@ -131,6 +132,8 @@ public class IoTDBLoadExternalTsfileIT {
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
     prepareData(insertSequenceSqls);
+    enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
   }
 
   @After
@@ -138,6 +141,7 @@ public class IoTDBLoadExternalTsfileIT {
     EnvironmentUtils.cleanEnv();
     IoTDBDescriptor.getInstance().getConfig()
         .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
index 860ff45..d9176c4 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
@@ -29,6 +29,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -42,6 +43,7 @@ import org.junit.Test;
 public class IoTDBRemovePartitionIT {
 
   private static int partitionInterval = 100;
+  private static boolean enableVirtualPartition = false;
 
   @Before
   public void setUp() throws Exception {
@@ -49,6 +51,8 @@ public class IoTDBRemovePartitionIT {
     EnvironmentUtils.envSetUp();
     StorageEngine.setEnablePartition(true);
     StorageEngine.setTimePartitionInterval(partitionInterval);
+    enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
     insertData();
   }
 
@@ -57,6 +61,7 @@ public class IoTDBRemovePartitionIT {
     StorageEngine.setEnablePartition(false);
     StorageEngine.setTimePartitionInterval(-1);
     EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 5b42f1e..73b244d 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -28,6 +28,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -113,6 +114,11 @@ public class IoTDBRestartIT {
 
     try {
       EnvironmentUtils.restartDaemon();
+      StorageEngine.getInstance().recover();
+      // wait for recover
+      while(!StorageEngine.getInstance().isAllSgReady()){
+        Thread.sleep(500);
+      }
     } catch (Exception e) {
       Assert.fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index c1ea53e..8731f44 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.HashVirtualPartitioner;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -120,6 +121,7 @@ public class EnvironmentUtils {
     }
     // close metadata
     IoTDB.metaManager.clear();
+    HashVirtualPartitioner.getInstance().clear();
 
     // close tracing
     if (config.isEnablePerformanceTracing()) {


[iotdb] 02/02: explore partition

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 983bc7025204fe996c7cf7a9baeeeafb94e8c03c
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Nov 30 19:56:29 2020 +0800

    explore partition
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  1 +
 .../storagegroup/HashVirtualPartitioner.java       | 53 ++++++++++++++++------
 .../db/engine/storagegroup/VirtualPartitioner.java | 34 ++++++++++++--
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |  4 +-
 5 files changed, 84 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5f94d9b..de51e23 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -787,6 +787,11 @@ public class IoTDBConfig {
    */
   private boolean enableVirtualPartition = true;
 
+  /**
+   * the number of virtual partition
+   */
+  private int virtualPartitionNum = 2;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -2106,4 +2111,12 @@ public class IoTDBConfig {
   public void setEnableVirtualPartition(boolean enableVirtualPartition) {
     this.enableVirtualPartition = enableVirtualPartition;
   }
+
+  public int getVirtualPartitionNum() {
+    return virtualPartitionNum;
+  }
+
+  public void setVirtualPartitionNum(int virtualPartitionNum) {
+    this.virtualPartitionNum = virtualPartitionNum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 2a93fab..3e1cc15 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -210,6 +210,7 @@ public class StorageEngine implements IService {
 
   public void recover() {
     setAllSgReady(false);
+    partitioner.recover();
     recoveryThreadPool = IoTDBThreadPoolFactory
         .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
     recoverAllSgThreadPool = IoTDBThreadPoolFactory
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
index 886ecb6..db718cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
@@ -18,35 +18,48 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
 
 public class HashVirtualPartitioner implements VirtualPartitioner {
 
-  public static final int STORGARE_GROUP_NUM = 2;
-  HashMap<Integer, Set<PartialPath>> sgToDevice;
+  public static final int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
+      .getVirtualPartitionNum();
+
+  // storage id -> set (device id)
+  private final Set<PartialPath>[] sgToDevice;
 
   private HashVirtualPartitioner() {
-    sgToDevice = new HashMap<>();
+    sgToDevice = new Set[STORAGE_GROUP_NUM];
+    for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+      sgToDevice[i] = new HashSet<>();
+    }
   }
 
   public static HashVirtualPartitioner getInstance() {
     return HashVirtualPartitionerHolder.INSTANCE;
   }
 
-  private int toPartitionId(PartialPath deviceId){
-    return deviceId.hashCode() % STORGARE_GROUP_NUM;
-  }
-
   @Override
   public PartialPath deviceToStorageGroup(PartialPath deviceId) {
-    int partitionId = toPartitionId(deviceId);
-    sgToDevice.computeIfAbsent(partitionId, id -> new HashSet<>()).add(deviceId);
+    int storageGroupId = toStorageGroupId(deviceId);
+
+    // check if we record the mapping between device id and storage group id
+    if (!sgToDevice[storageGroupId].contains(deviceId)) {
+      synchronized (sgToDevice) {
+        // double check
+        if (!sgToDevice[storageGroupId].add(deviceId)) {
+          // add new mapping to file
+          // TODO write to file
+        }
+      }
+    }
+
     try {
-      return new PartialPath("" + partitionId);
+      return new PartialPath(String.valueOf(storageGroupId));
     } catch (IllegalPathException e) {
       e.printStackTrace();
     }
@@ -56,17 +69,27 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
 
   @Override
   public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup) {
-    return sgToDevice.get(Integer.parseInt(storageGroup.getFullPath()));
+    return sgToDevice[Integer.parseInt(storageGroup.getFullPath())];
   }
 
   @Override
-  public void clear(){
-    sgToDevice.clear();
+  public void clear() {
+    for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+      sgToDevice[i] = new HashSet<>();
+    }
   }
 
   @Override
   public int getPartitionCount() {
-    return STORGARE_GROUP_NUM;
+    return STORAGE_GROUP_NUM;
+  }
+
+  public void recover() {
+
+  }
+
+  private int toStorageGroupId(PartialPath deviceId) {
+    return Math.abs(deviceId.hashCode() % STORAGE_GROUP_NUM);
   }
 
   private static class HashVirtualPartitionerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
index 09bd9e6..bc54254 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
@@ -22,11 +22,37 @@ import java.util.Set;
 import org.apache.iotdb.db.metadata.PartialPath;
 
 public interface VirtualPartitioner {
-    public PartialPath deviceToStorageGroup(PartialPath deviceId);
 
-    public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
+  /**
+   * use device id to determine storage group id
+   *
+   * @param deviceId device id
+   * @return virtual storage group id
+   */
+  public PartialPath deviceToStorageGroup(PartialPath deviceId);
 
-    public void clear();
+  /**
+   * use storage group id to get all device ids within this storage group
+   *
+   * @param storageGroup storage group id
+   * @return all device ids within this storage group
+   */
+  public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
 
-    public int getPartitionCount();
+  /**
+   * release resource
+   */
+  public void clear();
+
+  /**
+   * get total number of virtual storage group
+   *
+   * @return total number of virtual storage group
+   */
+  public int getPartitionCount();
+
+  /**
+   * recover virtual partitioner
+   */
+  public void recover();
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 3e5a075..74cbd93 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -130,10 +130,10 @@ public class IoTDBLoadExternalTsfileIT {
         .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    prepareData(insertSequenceSqls);
     enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
     IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData(insertSequenceSqls);
   }
 
   @After