You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/11/30 11:57:21 UTC

[iotdb] 02/02: explore partition

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 983bc7025204fe996c7cf7a9baeeeafb94e8c03c
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Nov 30 19:56:29 2020 +0800

    explore partition
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  1 +
 .../storagegroup/HashVirtualPartitioner.java       | 53 ++++++++++++++++------
 .../db/engine/storagegroup/VirtualPartitioner.java | 34 ++++++++++++--
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |  4 +-
 5 files changed, 84 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5f94d9b..de51e23 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -787,6 +787,11 @@ public class IoTDBConfig {
    */
   private boolean enableVirtualPartition = true;
 
+  /**
+   * the number of virtual partition
+   */
+  private int virtualPartitionNum = 2;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -2106,4 +2111,12 @@ public class IoTDBConfig {
   public void setEnableVirtualPartition(boolean enableVirtualPartition) {
     this.enableVirtualPartition = enableVirtualPartition;
   }
+
+  public int getVirtualPartitionNum() {
+    return virtualPartitionNum;
+  }
+
+  public void setVirtualPartitionNum(int virtualPartitionNum) {
+    this.virtualPartitionNum = virtualPartitionNum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 2a93fab..3e1cc15 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -210,6 +210,7 @@ public class StorageEngine implements IService {
 
   public void recover() {
     setAllSgReady(false);
+    partitioner.recover();
     recoveryThreadPool = IoTDBThreadPoolFactory
         .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
     recoverAllSgThreadPool = IoTDBThreadPoolFactory
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
index 886ecb6..db718cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
@@ -18,35 +18,48 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
 
 public class HashVirtualPartitioner implements VirtualPartitioner {
 
-  public static final int STORGARE_GROUP_NUM = 2;
-  HashMap<Integer, Set<PartialPath>> sgToDevice;
+  public static final int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
+      .getVirtualPartitionNum();
+
+  // storage id -> set (device id)
+  private final Set<PartialPath>[] sgToDevice;
 
   private HashVirtualPartitioner() {
-    sgToDevice = new HashMap<>();
+    sgToDevice = new Set[STORAGE_GROUP_NUM];
+    for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+      sgToDevice[i] = new HashSet<>();
+    }
   }
 
   public static HashVirtualPartitioner getInstance() {
     return HashVirtualPartitionerHolder.INSTANCE;
   }
 
-  private int toPartitionId(PartialPath deviceId){
-    return deviceId.hashCode() % STORGARE_GROUP_NUM;
-  }
-
   @Override
   public PartialPath deviceToStorageGroup(PartialPath deviceId) {
-    int partitionId = toPartitionId(deviceId);
-    sgToDevice.computeIfAbsent(partitionId, id -> new HashSet<>()).add(deviceId);
+    int storageGroupId = toStorageGroupId(deviceId);
+
+    // check if we record the mapping between device id and storage group id
+    if (!sgToDevice[storageGroupId].contains(deviceId)) {
+      synchronized (sgToDevice) {
+        // double check
+        if (!sgToDevice[storageGroupId].add(deviceId)) {
+          // add new mapping to file
+          // TODO write to file
+        }
+      }
+    }
+
     try {
-      return new PartialPath("" + partitionId);
+      return new PartialPath(String.valueOf(storageGroupId));
     } catch (IllegalPathException e) {
       e.printStackTrace();
     }
@@ -56,17 +69,27 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
 
   @Override
   public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup) {
-    return sgToDevice.get(Integer.parseInt(storageGroup.getFullPath()));
+    return sgToDevice[Integer.parseInt(storageGroup.getFullPath())];
   }
 
   @Override
-  public void clear(){
-    sgToDevice.clear();
+  public void clear() {
+    for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+      sgToDevice[i] = new HashSet<>();
+    }
   }
 
   @Override
   public int getPartitionCount() {
-    return STORGARE_GROUP_NUM;
+    return STORAGE_GROUP_NUM;
+  }
+
+  public void recover() {
+
+  }
+
+  private int toStorageGroupId(PartialPath deviceId) {
+    return Math.abs(deviceId.hashCode() % STORAGE_GROUP_NUM);
   }
 
   private static class HashVirtualPartitionerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
index 09bd9e6..bc54254 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
@@ -22,11 +22,37 @@ import java.util.Set;
 import org.apache.iotdb.db.metadata.PartialPath;
 
 public interface VirtualPartitioner {
-    public PartialPath deviceToStorageGroup(PartialPath deviceId);
 
-    public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
+  /**
+   * use device id to determine storage group id
+   *
+   * @param deviceId device id
+   * @return virtual storage group id
+   */
+  public PartialPath deviceToStorageGroup(PartialPath deviceId);
 
-    public void clear();
+  /**
+   * use storage group id to get all device ids within this storage group
+   *
+   * @param storageGroup storage group id
+   * @return all device ids within this storage group
+   */
+  public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
 
-    public int getPartitionCount();
+  /**
+   * release resource
+   */
+  public void clear();
+
+  /**
+   * get total number of virtual storage group
+   *
+   * @return total number of virtual storage group
+   */
+  public int getPartitionCount();
+
+  /**
+   * recover virtual partitioner
+   */
+  public void recover();
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 3e5a075..74cbd93 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -130,10 +130,10 @@ public class IoTDBLoadExternalTsfileIT {
         .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    prepareData(insertSequenceSqls);
     enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
     IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData(insertSequenceSqls);
   }
 
   @After