You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/11/30 11:57:21 UTC
[iotdb] 02/02: explore partition
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 983bc7025204fe996c7cf7a9baeeeafb94e8c03c
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Nov 30 19:56:29 2020 +0800
explore partition
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++
.../org/apache/iotdb/db/engine/StorageEngine.java | 1 +
.../storagegroup/HashVirtualPartitioner.java | 53 ++++++++++++++++------
.../db/engine/storagegroup/VirtualPartitioner.java | 34 ++++++++++++--
.../db/integration/IoTDBLoadExternalTsfileIT.java | 4 +-
5 files changed, 84 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5f94d9b..de51e23 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -787,6 +787,11 @@ public class IoTDBConfig {
*/
private boolean enableVirtualPartition = true;
+ /**
+ * the number of virtual partition
+ */
+ private int virtualPartitionNum = 2;
+
public IoTDBConfig() {
// empty constructor
}
@@ -2106,4 +2111,12 @@ public class IoTDBConfig {
public void setEnableVirtualPartition(boolean enableVirtualPartition) {
this.enableVirtualPartition = enableVirtualPartition;
}
+
+ public int getVirtualPartitionNum() {
+ return virtualPartitionNum;
+ }
+
+ public void setVirtualPartitionNum(int virtualPartitionNum) {
+ this.virtualPartitionNum = virtualPartitionNum;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 2a93fab..3e1cc15 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -210,6 +210,7 @@ public class StorageEngine implements IService {
public void recover() {
setAllSgReady(false);
+ partitioner.recover();
recoveryThreadPool = IoTDBThreadPoolFactory
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
recoverAllSgThreadPool = IoTDBThreadPoolFactory
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
index 886ecb6..db718cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
@@ -18,35 +18,48 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
public class HashVirtualPartitioner implements VirtualPartitioner {
- public static final int STORGARE_GROUP_NUM = 2;
- HashMap<Integer, Set<PartialPath>> sgToDevice;
+ public static final int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
+ .getVirtualPartitionNum();
+
+ // storage id -> set (device id)
+ private final Set<PartialPath>[] sgToDevice;
private HashVirtualPartitioner() {
- sgToDevice = new HashMap<>();
+ sgToDevice = new Set[STORAGE_GROUP_NUM];
+ for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+ sgToDevice[i] = new HashSet<>();
+ }
}
public static HashVirtualPartitioner getInstance() {
return HashVirtualPartitionerHolder.INSTANCE;
}
- private int toPartitionId(PartialPath deviceId){
- return deviceId.hashCode() % STORGARE_GROUP_NUM;
- }
-
@Override
public PartialPath deviceToStorageGroup(PartialPath deviceId) {
- int partitionId = toPartitionId(deviceId);
- sgToDevice.computeIfAbsent(partitionId, id -> new HashSet<>()).add(deviceId);
+ int storageGroupId = toStorageGroupId(deviceId);
+
+ // check if we record the mapping between device id and storage group id
+ if (!sgToDevice[storageGroupId].contains(deviceId)) {
+ synchronized (sgToDevice) {
+ // double check
+ if (!sgToDevice[storageGroupId].add(deviceId)) {
+ // add new mapping to file
+ // TODO write to file
+ }
+ }
+ }
+
try {
- return new PartialPath("" + partitionId);
+ return new PartialPath(String.valueOf(storageGroupId));
} catch (IllegalPathException e) {
e.printStackTrace();
}
@@ -56,17 +69,27 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
@Override
public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup) {
- return sgToDevice.get(Integer.parseInt(storageGroup.getFullPath()));
+ return sgToDevice[Integer.parseInt(storageGroup.getFullPath())];
}
@Override
- public void clear(){
- sgToDevice.clear();
+ public void clear() {
+ for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+ sgToDevice[i] = new HashSet<>();
+ }
}
@Override
public int getPartitionCount() {
- return STORGARE_GROUP_NUM;
+ return STORAGE_GROUP_NUM;
+ }
+
+ public void recover() {
+
+ }
+
+ private int toStorageGroupId(PartialPath deviceId) {
+ return Math.abs(deviceId.hashCode() % STORAGE_GROUP_NUM);
}
private static class HashVirtualPartitionerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
index 09bd9e6..bc54254 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
@@ -22,11 +22,37 @@ import java.util.Set;
import org.apache.iotdb.db.metadata.PartialPath;
public interface VirtualPartitioner {
- public PartialPath deviceToStorageGroup(PartialPath deviceId);
- public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
+ /**
+ * use device id to determine storage group id
+ *
+ * @param deviceId device id
+ * @return virtual storage group id
+ */
+ public PartialPath deviceToStorageGroup(PartialPath deviceId);
- public void clear();
+ /**
+ * use storage group id to get all device ids within this storage group
+ *
+ * @param storageGroup storage group id
+ * @return all device ids within this storage group
+ */
+ public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
- public int getPartitionCount();
+ /**
+ * release resource
+ */
+ public void clear();
+
+ /**
+ * get total number of virtual storage group
+ *
+ * @return total number of virtual storage group
+ */
+ public int getPartitionCount();
+
+ /**
+ * recover virtual partitioner
+ */
+ public void recover();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 3e5a075..74cbd93 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -130,10 +130,10 @@ public class IoTDBLoadExternalTsfileIT {
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
- prepareData(insertSequenceSqls);
enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData(insertSequenceSqls);
}
@After