You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/12/01 08:30:32 UTC
[iotdb] 03/03: explore file write
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1099ec4aa336e0040885fa3ec6c16fdb9a0f29b5
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Tue Dec 1 16:30:07 2020 +0800
explore file write
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../virtualSg/HashVirtualPartitioner.java | 42 +++++++++-
.../storagegroup/virtualSg/VirtualPartitioner.java | 5 --
.../virtualSg/VirtualPartitionerReader.java | 86 +++++++++++++++++++
.../virtualSg/VirtualPartitionerWriter.java | 94 +++++++++++++++++++++
.../virtualSg/HashVirtualPartitionerTest.java | 97 ++++++++++++++++++++++
6 files changed, 319 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index cdcb8f3..ae0e415 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -92,7 +92,7 @@ public class StorageEngine implements IService {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
- private static final VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+ private final VirtualPartitioner partitioner;
/**
* Time range for dividing storage group, the time unit is the same with IoTDB's
@@ -146,6 +146,7 @@ public class StorageEngine implements IService {
// recover upgrade process
UpgradeUtils.recoverUpgrade();
+ partitioner = HashVirtualPartitioner.getInstance();
recover();
}
@@ -210,7 +211,6 @@ public class StorageEngine implements IService {
public void recover() {
setAllSgReady(false);
- partitioner.recover();
recoveryThreadPool = IoTDBThreadPoolFactory
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
recoverAllSgThreadPool = IoTDBThreadPoolFactory
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
index 2657a7b..8f68ec9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
@@ -23,20 +23,34 @@ import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HashVirtualPartitioner implements VirtualPartitioner {
+ private static final Logger logger = LoggerFactory.getLogger(HashVirtualPartitioner.class);
+
+
public static final int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
.getVirtualPartitionNum();
// storage id -> set (device id)
private final Set<PartialPath>[] sgToDevice;
+ // log writer
+ private VirtualPartitionerWriter writer;
+
private HashVirtualPartitioner() {
sgToDevice = new Set[STORAGE_GROUP_NUM];
for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
sgToDevice[i] = new HashSet<>();
}
+
+ recover();
+
+ writer = new VirtualPartitionerWriter();
}
public static HashVirtualPartitioner getInstance() {
@@ -51,9 +65,9 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
if (!sgToDevice[storageGroupId].contains(deviceId)) {
synchronized (sgToDevice) {
// double check
- if (!sgToDevice[storageGroupId].add(deviceId)) {
+ if (sgToDevice[storageGroupId].add(deviceId)) {
// add new mapping to file
- // TODO write to file
+ writer.writeMapping(String.valueOf(storageGroupId), deviceId.getFullPath());
}
}
}
@@ -77,6 +91,8 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
sgToDevice[i] = new HashSet<>();
}
+ writer.clear();
+ writer = new VirtualPartitionerWriter();
}
@Override
@@ -84,8 +100,30 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
return STORAGE_GROUP_NUM;
}
+ @TestOnly
+ public void restart() {
+ for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+ sgToDevice[i] = new HashSet<>();
+ }
+
+ recover();
+ }
+
public void recover() {
+ VirtualPartitionerReader reader = new VirtualPartitionerReader();
+ Pair<String, String> mapping = null;
+ mapping = reader.readMapping();
+
+ while(mapping != null){
+ int storageGroupId = Integer.parseInt(mapping.left);
+ try {
+ sgToDevice[storageGroupId].add(new PartialPath(mapping.right));
+ } catch (IllegalPathException e) {
+ logger.error("can not recover virtual partitioner when reading: " + mapping, e);
+ }
+ mapping = reader.readMapping();
+ }
}
private int toStorageGroupId(PartialPath deviceId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
index c11bae4..13411c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
@@ -50,9 +50,4 @@ public interface VirtualPartitioner {
* @return total number of virtual storage group
*/
public int getPartitionCount();
-
- /**
- * recover virtual partitioner
- */
- public void recover();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerReader.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerReader.java
new file mode 100644
index 0000000..251694a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerReader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualPartitionerReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(VirtualPartitionerReader.class);
+ private static final String MAPPING_SEPARATOR = ",";
+ private static final String LOG_FILE_NAME = "VirtualSGMapping.log";
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private BufferedReader reader;
+
+ VirtualPartitionerReader() {
+ String systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
+ File logFile = SystemFileFactory.INSTANCE.getFile(systemDir + File.separator + LOG_FILE_NAME);
+ // no log file, create it
+ if (!logFile.exists()){
+ try {
+ logFile.createNewFile();
+ } catch (IOException e) {
+ logger.error("can not create virtual storage group mapping file because: ", e);
+ }
+ }
+
+ try {
+ reader = new BufferedReader(new FileReader(logFile));
+ } catch (FileNotFoundException e) {
+ logger.error("can not create virtual storage group mapping because: ", e);
+ }
+ }
+
+ /**
+ * read the mapping between virtual storage group id and device id
+ * @return mapping line
+ */
+ public Pair<String, String> readMapping() {
+ String line = null;
+ try {
+ line = reader.readLine();
+ } catch (IOException e) {
+ logger.error("can not read virtual storage group mapping because: ", e);
+ }
+ if(line == null){
+ return null;
+ }
+
+ String[] part = line.split(MAPPING_SEPARATOR);
+
+ return new Pair<>(part[0], part[1]);
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerWriter.java
new file mode 100644
index 0000000..42d3182
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualPartitionerWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(VirtualPartitionerWriter.class);
+ private static final String MAPPING_SEPARATOR = ",";
+ private static final String LINE_SEPARATOR = System.lineSeparator();
+ private static final String LOG_FILE_NAME = "VirtualSGMapping.log";
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private File logFile;
+ private FileOutputStream fileOutputStream;
+ private FileChannel channel;
+
+ VirtualPartitionerWriter() {
+ String systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
+ logFile = SystemFileFactory.INSTANCE.getFile(systemDir + File.separator + LOG_FILE_NAME);
+
+ try {
+ fileOutputStream = new FileOutputStream(logFile, true);
+ channel = fileOutputStream.getChannel();
+ } catch (FileNotFoundException e) {
+ logger.error("can not create virtual storage group mapping because: ", e);
+ }
+ }
+
+ /**
+ * write the mapping between virtual storage group id and device id
+ *
+ * @param storageGroupId virtual storage group id
+ * @param deviceId device id
+ */
+ public void writeMapping(String storageGroupId, String deviceId) {
+ String line = storageGroupId
+ + MAPPING_SEPARATOR
+ + deviceId
+ + LINE_SEPARATOR;
+
+ try {
+ channel.write(ByteBuffer.wrap(line.getBytes()));
+ channel.force(false);
+ } catch (IOException e) {
+ logger.error("can not write virtual storage group mapping because: ", e);
+ }
+ }
+
+ public void close() {
+ try {
+ channel.force(true);
+ channel.close();
+ fileOutputStream.close();
+ } catch (IOException e) {
+ logger.error("can not close virtual storage group mapping file because: ", e);
+
+ }
+ }
+
+ public void clear() {
+ close();
+ logFile.delete();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
new file mode 100644
index 0000000..068d358
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HashVirtualPartitionerTest {
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ // init file dir
+ StorageEngine.getInstance();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void basicTest() throws IllegalPathException {
+ HashVirtualPartitioner hashVirtualPartitioner = HashVirtualPartitioner.getInstance();
+
+ // sg -> deviceId
+ HashMap<PartialPath, Set<PartialPath>> realMap = new HashMap<>();
+ PartialPath d1 = new PartialPath("root.sg1.d1");
+ PartialPath d2 = new PartialPath("root.sg1.d2");
+
+
+ PartialPath sg1 = hashVirtualPartitioner.deviceToStorageGroup(d1);
+ PartialPath sg2 = hashVirtualPartitioner.deviceToStorageGroup(d2);
+
+ realMap.computeIfAbsent(sg1, id -> new HashSet<>()).add(d1);
+ realMap.computeIfAbsent(sg2, id -> new HashSet<>()).add(d2);
+
+ for(PartialPath sg : realMap.keySet()){
+ assertEquals(realMap.getOrDefault(sg, Collections.emptySet()), hashVirtualPartitioner.storageGroupToDevice(sg));
+ }
+ }
+
+
+ @Test
+ public void basicRecoverTest() throws IllegalPathException {
+ HashVirtualPartitioner hashVirtualPartitioner = HashVirtualPartitioner.getInstance();
+
+ // sg -> deviceId
+ HashMap<PartialPath, Set<PartialPath>> realMap = new HashMap<>();
+ PartialPath d1 = new PartialPath("root.sg1.d1");
+ PartialPath d2 = new PartialPath("root.sg1.d2");
+
+
+ PartialPath sg1 = hashVirtualPartitioner.deviceToStorageGroup(d1);
+ PartialPath sg2 = hashVirtualPartitioner.deviceToStorageGroup(d2);
+
+ realMap.computeIfAbsent(sg1, id -> new HashSet<>()).add(d1);
+ realMap.computeIfAbsent(sg2, id -> new HashSet<>()).add(d2);
+
+ for(PartialPath sg : realMap.keySet()){
+ assertEquals(realMap.getOrDefault(sg, Collections.emptySet()), hashVirtualPartitioner.storageGroupToDevice(sg));
+ }
+
+ hashVirtualPartitioner.restart();
+
+ for(PartialPath sg : realMap.keySet()){
+ assertEquals(realMap.getOrDefault(sg, Collections.emptySet()), hashVirtualPartitioner.storageGroupToDevice(sg));
+ }
+ }
+}