You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/12/01 08:30:32 UTC

[iotdb] 03/03: explore file write

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1099ec4aa336e0040885fa3ec6c16fdb9a0f29b5
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Tue Dec 1 16:30:07 2020 +0800

    explore file write
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +-
 .../virtualSg/HashVirtualPartitioner.java          | 42 +++++++++-
 .../storagegroup/virtualSg/VirtualPartitioner.java |  5 --
 .../virtualSg/VirtualPartitionerReader.java        | 86 +++++++++++++++++++
 .../virtualSg/VirtualPartitionerWriter.java        | 94 +++++++++++++++++++++
 .../virtualSg/HashVirtualPartitionerTest.java      | 97 ++++++++++++++++++++++
 6 files changed, 319 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index cdcb8f3..ae0e415 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -92,7 +92,7 @@ public class StorageEngine implements IService {
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
-  private static final VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+  private final VirtualPartitioner partitioner;
 
   /**
    * Time range for dividing storage group, the time unit is the same with IoTDB's
@@ -146,6 +146,7 @@ public class StorageEngine implements IService {
     // recover upgrade process
     UpgradeUtils.recoverUpgrade();
 
+    partitioner = HashVirtualPartitioner.getInstance();
     recover();
   }
 
@@ -210,7 +211,6 @@ public class StorageEngine implements IService {
 
   public void recover() {
     setAllSgReady(false);
-    partitioner.recover();
     recoveryThreadPool = IoTDBThreadPoolFactory
         .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
     recoverAllSgThreadPool = IoTDBThreadPoolFactory
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
index 2657a7b..8f68ec9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
@@ -23,20 +23,34 @@ import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HashVirtualPartitioner implements VirtualPartitioner {
 
+  private static final Logger logger = LoggerFactory.getLogger(HashVirtualPartitioner.class);
+
+
   public static final int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
       .getVirtualPartitionNum();
 
   // storage id -> set (device id)
   private final Set<PartialPath>[] sgToDevice;
 
+  // log writer
+  private VirtualPartitionerWriter writer;
+
   private HashVirtualPartitioner() {
     sgToDevice = new Set[STORAGE_GROUP_NUM];
     for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
       sgToDevice[i] = new HashSet<>();
     }
+
+    recover();
+
+    writer = new VirtualPartitionerWriter();
   }
 
   public static HashVirtualPartitioner getInstance() {
@@ -51,9 +65,9 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
     if (!sgToDevice[storageGroupId].contains(deviceId)) {
       synchronized (sgToDevice) {
         // double check
-        if (!sgToDevice[storageGroupId].add(deviceId)) {
+        if (sgToDevice[storageGroupId].add(deviceId)) {
           // add new mapping to file
-          // TODO write to file
+          writer.writeMapping(String.valueOf(storageGroupId), deviceId.getFullPath());
         }
       }
     }
@@ -77,6 +91,8 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
     for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
       sgToDevice[i] = new HashSet<>();
     }
+    writer.clear();
+    writer = new VirtualPartitionerWriter();
   }
 
   @Override
@@ -84,8 +100,30 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
     return STORAGE_GROUP_NUM;
   }
 
+  @TestOnly
+  public void restart() {
+    for (int i = 0; i < STORAGE_GROUP_NUM; i++) {
+      sgToDevice[i] = new HashSet<>();
+    }
+
+    recover();
+  }
+
   public void recover() {
+    VirtualPartitionerReader reader = new VirtualPartitionerReader();
+    Pair<String, String> mapping = null;
+    mapping = reader.readMapping();
+
+    while(mapping != null){
+      int storageGroupId = Integer.parseInt(mapping.left);
+      try {
+        sgToDevice[storageGroupId].add(new PartialPath(mapping.right));
+      } catch (IllegalPathException e) {
+        logger.error("can not recover virtual partitioner when reading: " + mapping, e);
+      }
 
+      mapping = reader.readMapping();
+    }
   }
 
   private int toStorageGroupId(PartialPath deviceId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
index c11bae4..13411c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
@@ -50,9 +50,4 @@ public interface VirtualPartitioner {
    * @return total number of virtual storage group
    */
   public int getPartitionCount();
-
-  /**
-   * recover virtual partitioner
-   */
-  public void recover();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerReader.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerReader.java
new file mode 100644
index 0000000..251694a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerReader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualPartitionerReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(VirtualPartitionerReader.class);
+  private static final String MAPPING_SEPARATOR = ",";
+  private static final String LOG_FILE_NAME = "VirtualSGMapping.log";
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private BufferedReader reader;
+
+  VirtualPartitionerReader() {
+    String systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
+    File logFile = SystemFileFactory.INSTANCE.getFile(systemDir + File.separator + LOG_FILE_NAME);
+    // no log file, create it
+    if (!logFile.exists()){
+      try {
+        logFile.createNewFile();
+      } catch (IOException e) {
+        logger.error("can not create virtual storage group mapping file because: ", e);
+      }
+    }
+
+    try {
+      reader = new BufferedReader(new FileReader(logFile));
+    } catch (FileNotFoundException e) {
+      logger.error("can not create virtual storage group mapping because: ", e);
+    }
+  }
+
+  /**
+   * read the mapping between virtual storage group id and device id
+   * @return mapping line
+   */
+  public Pair<String, String> readMapping() {
+    String line = null;
+    try {
+      line = reader.readLine();
+    } catch (IOException e) {
+      logger.error("can not read virtual storage group mapping because: ", e);
+    }
+    if(line == null){
+      return null;
+    }
+
+    String[] part = line.split(MAPPING_SEPARATOR);
+
+    return new Pair<>(part[0], part[1]);
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerWriter.java
new file mode 100644
index 0000000..42d3182
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitionerWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualPartitionerWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(VirtualPartitionerWriter.class);
+  private static final String MAPPING_SEPARATOR = ",";
+  private static final String LINE_SEPARATOR = System.lineSeparator();
+  private static final String LOG_FILE_NAME = "VirtualSGMapping.log";
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private File logFile;
+  private FileOutputStream fileOutputStream;
+  private FileChannel channel;
+
+  VirtualPartitionerWriter() {
+    String systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
+    logFile = SystemFileFactory.INSTANCE.getFile(systemDir + File.separator + LOG_FILE_NAME);
+
+    try {
+      fileOutputStream = new FileOutputStream(logFile, true);
+      channel = fileOutputStream.getChannel();
+    } catch (FileNotFoundException e) {
+      logger.error("can not create virtual storage group mapping because: ", e);
+    }
+  }
+
+  /**
+   * write the mapping between virtual storage group id and device id
+   *
+   * @param storageGroupId virtual storage group id
+   * @param deviceId       device id
+   */
+  public void writeMapping(String storageGroupId, String deviceId) {
+    String line = storageGroupId
+        + MAPPING_SEPARATOR
+        + deviceId
+        + LINE_SEPARATOR;
+
+    try {
+      channel.write(ByteBuffer.wrap(line.getBytes()));
+      channel.force(false);
+    } catch (IOException e) {
+      logger.error("can not write virtual storage group mapping because: ", e);
+    }
+  }
+
+  public void close() {
+    try {
+      channel.force(true);
+      channel.close();
+      fileOutputStream.close();
+    } catch (IOException e) {
+      logger.error("can not close virtual storage group mapping file because: ", e);
+
+    }
+  }
+
+  public void clear() {
+    close();
+    logFile.delete();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
new file mode 100644
index 0000000..068d358
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HashVirtualPartitionerTest {
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    // init file dir
+    StorageEngine.getInstance();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void basicTest() throws IllegalPathException {
+    HashVirtualPartitioner hashVirtualPartitioner = HashVirtualPartitioner.getInstance();
+
+    // sg -> deviceId
+    HashMap<PartialPath, Set<PartialPath>> realMap = new HashMap<>();
+    PartialPath d1 = new PartialPath("root.sg1.d1");
+    PartialPath d2 = new PartialPath("root.sg1.d2");
+
+
+    PartialPath sg1 = hashVirtualPartitioner.deviceToStorageGroup(d1);
+    PartialPath sg2 = hashVirtualPartitioner.deviceToStorageGroup(d2);
+
+    realMap.computeIfAbsent(sg1, id -> new HashSet<>()).add(d1);
+    realMap.computeIfAbsent(sg2, id -> new HashSet<>()).add(d2);
+
+    for(PartialPath sg : realMap.keySet()){
+      assertEquals(realMap.getOrDefault(sg, Collections.emptySet()), hashVirtualPartitioner.storageGroupToDevice(sg));
+    }
+  }
+
+
+  @Test
+  public void basicRecoverTest() throws IllegalPathException {
+    HashVirtualPartitioner hashVirtualPartitioner = HashVirtualPartitioner.getInstance();
+
+    // sg -> deviceId
+    HashMap<PartialPath, Set<PartialPath>> realMap = new HashMap<>();
+    PartialPath d1 = new PartialPath("root.sg1.d1");
+    PartialPath d2 = new PartialPath("root.sg1.d2");
+
+
+    PartialPath sg1 = hashVirtualPartitioner.deviceToStorageGroup(d1);
+    PartialPath sg2 = hashVirtualPartitioner.deviceToStorageGroup(d2);
+
+    realMap.computeIfAbsent(sg1, id -> new HashSet<>()).add(d1);
+    realMap.computeIfAbsent(sg2, id -> new HashSet<>()).add(d2);
+
+    for(PartialPath sg : realMap.keySet()){
+      assertEquals(realMap.getOrDefault(sg, Collections.emptySet()), hashVirtualPartitioner.storageGroupToDevice(sg));
+    }
+
+    hashVirtualPartitioner.restart();
+
+    for(PartialPath sg : realMap.keySet()){
+      assertEquals(realMap.getOrDefault(sg, Collections.emptySet()), hashVirtualPartitioner.storageGroupToDevice(sg));
+    }
+  }
+}