You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/12/09 04:52:19 UTC
[iotdb] branch virtual_partition_2 updated: refactor
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch virtual_partition_2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/virtual_partition_2 by this push:
new 538bb08 refactor
538bb08 is described below
commit 538bb08ba67a775b428e30ba742f98c7c99d92ea
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 9 12:51:58 2020 +0800
refactor
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 64 +++++++++++-----------
...eGroup.java => VirtualStorageGroupManager.java} | 47 +++++++++++-----
2 files changed, 65 insertions(+), 46 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 861a584..a492962 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -55,7 +55,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualPartitioner;
-import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroup;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.ShutdownException;
@@ -117,7 +117,7 @@ public class StorageEngine implements IService {
/**
* storage group name -> storage group processor
*/
- private final ConcurrentHashMap<PartialPath, VirtualStorageGroup> processorMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<PartialPath, VirtualStorageGroupManager> processorMap = new ConcurrentHashMap<>();
private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
private ExecutorService recoverAllSgThreadPool;
@@ -275,9 +275,9 @@ public class StorageEngine implements IService {
for (StorageGroupMNode storageGroup : sgNodes) {
futures.add(recoveryThreadPool.submit(() -> {
try {
- VirtualStorageGroup virtualStorageGroup = new VirtualStorageGroup();
- virtualStorageGroup.recover(storageGroup);
- processorMap.put(storageGroup.getPartialPath(), virtualStorageGroup);
+ VirtualStorageGroupManager virtualStorageGroupManager = new VirtualStorageGroupManager();
+ virtualStorageGroupManager.recover(storageGroup);
+ processorMap.put(storageGroup.getPartialPath(), virtualStorageGroupManager);
logger.info("Storage Group Processor {} is recovered successfully",
storageGroup.getFullPath());
@@ -300,7 +300,7 @@ public class StorageEngine implements IService {
private void checkTTL() {
try {
- for (VirtualStorageGroup processor : processorMap.values()) {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
processor.checkTTL();
}
} catch (ConcurrentModificationException e) {
@@ -406,15 +406,15 @@ public class StorageEngine implements IService {
private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath storageGroupPath,
StorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException {
- VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupMNode.getPartialPath());
- if (virtualStorageGroup == null) {
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+ if (virtualStorageGroupManager == null) {
// if finish recover
if (isAllSgReady.get()) {
synchronized (storageGroupMNode) {
- virtualStorageGroup = processorMap.get(storageGroupMNode.getPartialPath());
- if (virtualStorageGroup == null) {
- virtualStorageGroup = new VirtualStorageGroup();
- processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroup);
+ virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+ if (virtualStorageGroupManager == null) {
+ virtualStorageGroupManager = new VirtualStorageGroupManager();
+ processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroupManager);
}
}
} else {
@@ -424,7 +424,7 @@ public class StorageEngine implements IService {
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
}
}
- return virtualStorageGroup.getProcessor(storageGroupPath, storageGroupMNode);
+ return virtualStorageGroupManager.getProcessor(storageGroupPath, storageGroupMNode);
}
public StorageGroupProcessor buildNewStorageGroupProcessor(PartialPath storageGroupPath,
@@ -504,14 +504,14 @@ public class StorageEngine implements IService {
*/
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
- for (VirtualStorageGroup processor : processorMap.values()) {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
processor.syncCloseAllWorkingTsFileProcessors();
}
}
public void forceCloseAllProcessor() throws TsFileProcessorException {
logger.info("Start force closing all storage group processor");
- for (VirtualStorageGroup processor : processorMap.values()) {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
processor.forceCloseAllWorkingTsFileProcessors();
}
}
@@ -522,8 +522,8 @@ public class StorageEngine implements IService {
return;
}
- VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupPath);
- for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+ for (StorageGroupProcessor processor : virtualStorageGroupManager.getAllPartition()) {
if (processor == null) {
continue;
}
@@ -578,8 +578,8 @@ public class StorageEngine implements IService {
return;
}
- VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupPath);
- for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+ for (StorageGroupProcessor processor : virtualStorageGroupManager.getAllPartition()) {
if (processor != null) {
logger
.info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
@@ -687,8 +687,8 @@ public class StorageEngine implements IService {
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
- for (VirtualStorageGroup virtualStorageGroup : processorMap.values()) {
- for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()) {
+ for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()) {
if (storageGroupProcessor != null) {
totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
}
@@ -707,8 +707,8 @@ public class StorageEngine implements IService {
throw new StorageEngineException(
"Current system mode is read only, does not support file upgrade");
}
- for (VirtualStorageGroup virtualStorageGroup : processorMap.values()) {
- for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()) {
+ for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()) {
if (storageGroupProcessor != null) {
storageGroupProcessor.upgrade();
}
@@ -726,8 +726,8 @@ public class StorageEngine implements IService {
throw new StorageEngineException("Current system mode is read only, does not support merge");
}
- for (VirtualStorageGroup virtualStorageGroup : processorMap.values()) {
- for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()) {
+ for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()) {
if (storageGroupProcessor != null) {
storageGroupProcessor.merge(fullMerge);
}
@@ -787,8 +787,8 @@ public class StorageEngine implements IService {
}
deleteAllDataFilesInOneStorageGroup(storageGroupPath);
- VirtualStorageGroup virtualStorageGroup = processorMap.remove(storageGroupPath);
- for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.remove(storageGroupPath);
+ for (StorageGroupProcessor processor : virtualStorageGroupManager.getAllPartition()) {
if (processor != null) {
processor.deleteFolder(systemDir + File.pathSeparator + storageGroupPath);
}
@@ -847,7 +847,7 @@ public class StorageEngine implements IService {
*/
public Map<PartialPath, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
- for (Entry<PartialPath, VirtualStorageGroup> entry : processorMap.entrySet()) {
+ for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
for(StorageGroupProcessor storageGroupProcessor : entry.getValue().getAllPartition()){
if(storageGroupProcessor != null){
List<TsFileResource> allResources = storageGroupProcessor.getSequenceFileTreeSet();
@@ -873,8 +873,8 @@ public class StorageEngine implements IService {
public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath storageGroup,
long partitionNum) {
- VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroup);
- for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()){
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroup);
+ for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupManager.getAllPartition()){
if(storageGroupProcessor != null && storageGroupProcessor.isFileAlreadyExist(tsFileResource, partitionNum)){
return true;
}
@@ -908,7 +908,7 @@ public class StorageEngine implements IService {
}
}
- public Map<PartialPath, VirtualStorageGroup> getProcessorMap() {
+ public Map<PartialPath, VirtualStorageGroupManager> getProcessorMap() {
return processorMap;
}
@@ -920,7 +920,7 @@ public class StorageEngine implements IService {
*/
public Map<String, List<Pair<Long, Boolean>>> getWorkingStorageGroupPartitions() {
Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
- for (Entry<PartialPath, VirtualStorageGroup> entry : processorMap.entrySet()) {
+ for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
for(StorageGroupProcessor storageGroupProcessor : entry.getValue().getAllPartition()) {
if (storageGroupProcessor != null) {
List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 85a60c4..46f75c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -11,41 +11,56 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class VirtualStorageGroup {
+public class VirtualStorageGroupManager {
- private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroup.class);
+ private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroupManager.class);
+ /**
+ * virtual storage group partitioner
+ */
VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
- StorageGroupProcessor[] virtualPartition;
+ /**
+ * all virtual storage group processor
+ */
+ StorageGroupProcessor[] virtualStorageGroupProcessor;
public StorageGroupProcessor[] getAllPartition(){
- return virtualPartition;
+ return virtualStorageGroupProcessor;
}
- public VirtualStorageGroup(){
- virtualPartition = new StorageGroupProcessor[partitioner.getPartitionCount()];
+ public VirtualStorageGroupManager(){
+ virtualStorageGroupProcessor = new StorageGroupProcessor[partitioner.getPartitionCount()];
}
+ /**
+ * push forceCloseAllWorkingTsFileProcessors down to all sg
+ */
public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
- for(StorageGroupProcessor storageGroupProcessor : virtualPartition){
+ for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
if(storageGroupProcessor != null){
storageGroupProcessor.forceCloseAllWorkingTsFileProcessors();
}
}
}
+ /**
+ * push syncCloseAllWorkingTsFileProcessors down to all sg
+ */
public void syncCloseAllWorkingTsFileProcessors(){
- for(StorageGroupProcessor storageGroupProcessor : virtualPartition){
+ for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
if(storageGroupProcessor != null){
storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
}
}
}
+ /**
+ * push check ttl down to all sg
+ */
public void checkTTL(){
- for(StorageGroupProcessor storageGroupProcessor : virtualPartition){
+ for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor){
if(storageGroupProcessor != null){
storageGroupProcessor.checkFilesTTL();
}
@@ -53,7 +68,7 @@ public class VirtualStorageGroup {
}
/**
- *
+ * get processor from device id
* @param partialPath device path
* @return virtual storage group processor
*/
@@ -61,16 +76,16 @@ public class VirtualStorageGroup {
throws StorageGroupProcessorException, StorageEngineException {
int loc = partitioner.deviceToStorageGroup(partialPath);
- StorageGroupProcessor processor = virtualPartition[loc];
+ StorageGroupProcessor processor = virtualStorageGroupProcessor[loc];
if (processor == null) {
// if finish recover
if (StorageEngine.getInstance().isAllSgReady()) {
synchronized (storageGroupMNode) {
- processor = virtualPartition[loc];
+ processor = virtualStorageGroupProcessor[loc];
if (processor == null) {
processor = StorageEngine.getInstance()
.buildNewStorageGroupProcessor(partialPath, storageGroupMNode, String.valueOf(loc));
- virtualPartition[loc] = processor;
+ virtualStorageGroupProcessor[loc] = processor;
}
}
} else {
@@ -84,11 +99,15 @@ public class VirtualStorageGroup {
return processor;
}
+ /**
+ * recover
+ * @param storageGroupMNode logical sg mnode
+ */
public void recover(StorageGroupMNode storageGroupMNode) throws StorageGroupProcessorException {
for (int i = 0; i < partitioner.getPartitionCount(); i++) {
StorageGroupProcessor processor = StorageEngine.getInstance()
.buildNewStorageGroupProcessor(storageGroupMNode.getPartialPath(), storageGroupMNode, String.valueOf(i));
- virtualPartition[i] = processor;
+ virtualStorageGroupProcessor[i] = processor;
}
}
}