You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/29 03:54:34 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (94ad2c8 -> 5cc8554)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 94ad2c8  remove check TVList size in MemTableFlushTaskV2
     new 5e41f43  add the statistic for the number of series in each storage group; and set the ChunkBufferPool capacity as (core = the maximal number of series of storage groups, max = 2 * core)
     new 5cc8554  solving conflicts

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/engine/memtable/ChunkBufferPool.java  | 34 ++++++++++---------
 .../java/org/apache/iotdb/db/metadata/MGraph.java  | 16 +++++++++
 .../org/apache/iotdb/db/metadata/MManager.java     | 38 ++++++++++++++++++++++
 .../iotdb/db/metadata/MManagerBasicTest.java       | 24 ++++++++++++++
 4 files changed, 97 insertions(+), 15 deletions(-)


[incubator-iotdb] 02/02: solving conflicts

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 5cc8554b61927e226f6af513564b44db3b8f91aa
Merge: 5e41f43 94ad2c8
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Jun 29 11:54:24 2019 +0800

    solving conflicts

 .../iotdb/db/engine/memtable/AbstractMemTable.java | 14 ++++-----
 .../iotdb/db/engine/memtable/ChunkBufferPool.java  | 15 +---------
 .../iotdb/db/engine/memtable/EmptyMemTable.java    |  2 +-
 .../db/engine/memtable/PrimitiveMemTable.java      |  4 +--
 .../db/utils/datastructure/TVListAllocator.java    | 34 +++++++++++++++++-----
 5 files changed, 37 insertions(+), 32 deletions(-)

diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
index e40a7cf,2539187..b481837
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
@@@ -37,20 -36,12 +37,7 @@@ public class ChunkBufferPool 
    private static final Logger LOGGER = LoggerFactory.getLogger(ChunkBufferPool.class);
  
    private static final Deque<ChunkBuffer> availableChunkBuffer = new ArrayDeque<>();
- 
- //  /**
- //   * the number of required FlushTasks is no more than {@linkplain MemTablePool}.
- //   */
- //  private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
- //      .getMemtableNumber();
--
--  /**
-    * The number of chunkBuffer in the pool is less than the memtable number by default.
-    * Once the maximal number of time series is greater than the capacity, the capacity will be updated
-    * to the maximal number.
 -   * the number of required FlushTasks is no more than {@linkplain MemTablePool}.
--   */
-  //private volatile int capacity = IoTDBDescriptor.getInstance().getConfig().getMemtableNumber();
 -  private static final int capacity = 2208000;
--
++  
    private int size = 0;
  
    private static final int WAIT_TIME = 2000;


[incubator-iotdb] 01/02: add the statistic for the number of series in each storage group; and set the ChunkBufferPool capacity as (core = the maximal number of series of storage groups, max = 2 * core)

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 5e41f43df6b64b6b9d9858a0d2706e9ac02e5d62
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Jun 29 11:52:09 2019 +0800

    add the statistic for the number of series in each storage group; and set the ChunkBufferPool capacity as (core = the maximal number of series of storage groups, max = 2 * core)
---
 .../iotdb/db/engine/memtable/ChunkBufferPool.java  | 40 +++++++++++++++-------
 .../java/org/apache/iotdb/db/metadata/MGraph.java  | 16 +++++++++
 .../org/apache/iotdb/db/metadata/MManager.java     | 38 ++++++++++++++++++++
 .../iotdb/db/metadata/MManagerBasicTest.java       | 24 +++++++++++++
 4 files changed, 106 insertions(+), 12 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
index 4b371e4..e40a7cf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
@@ -17,6 +17,7 @@ package org.apache.iotdb.db.engine.memtable;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -37,11 +38,18 @@ public class ChunkBufferPool {
 
   private static final Deque<ChunkBuffer> availableChunkBuffer = new ArrayDeque<>();
 
+//  /**
+//   * the number of required FlushTasks is no more than {@linkplain MemTablePool}.
+//   */
+//  private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
+//      .getMemtableNumber();
+
   /**
-   * the number of required FlushTasks is no more than {@linkplain MemTablePool}.
+   * The number of chunkBuffer in the pool is less than the memtable number by default.
+   * Once the maximal number of time series is greater than the capacity, the capacity will be updated
+   * to the maximal number.
    */
-  private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
-      .getMemtableNumber();
+ //private volatile int capacity = IoTDBDescriptor.getInstance().getConfig().getMemtableNumber();
 
   private int size = 0;
 
@@ -52,6 +60,10 @@ public class ChunkBufferPool {
 
   public ChunkBuffer getEmptyChunkBuffer(Object applier, MeasurementSchema schema) {
     synchronized (availableChunkBuffer) {
+      //we use the memtable number * maximal series number in one StroageGroup * 2 as the capacity
+      int capacity  =
+          2 * MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups() * IoTDBDescriptor
+              .getInstance().getConfig().getMemtableNumber();
       if (availableChunkBuffer.isEmpty() && size < capacity) {
         size++;
 //        LOGGER.info("For fask, generated a new ChunkBuffer for {}, system ChunkBuffer size: {}, stack size: {}",
@@ -61,7 +73,7 @@ public class ChunkBufferPool {
 //        LOGGER
 //            .info("ReusableChunkBuffer size: {}, stack size: {}, then get a ChunkBuffer from stack for {}",
 //                size, availableChunkBuffer.size(), applier);
-        ChunkBuffer chunkBuffer =  availableChunkBuffer.pop();
+        ChunkBuffer chunkBuffer = availableChunkBuffer.pop();
         chunkBuffer.reInit(schema);
         return chunkBuffer;
       }
@@ -80,7 +92,8 @@ public class ChunkBufferPool {
         } catch (InterruptedException e) {
           LOGGER.error("{} fails to wait fot ReusableChunkBuffer {}, continue to wait", applier, e);
         }
-        LOGGER.info("{} has waited for a ReusableChunkBuffer for {}ms", applier, waitCount++ * WAIT_TIME);
+        LOGGER.info("{} has waited for a ReusableChunkBuffer for {}ms", applier,
+            waitCount++ * WAIT_TIME);
       }
     }
   }
@@ -88,19 +101,22 @@ public class ChunkBufferPool {
   public void putBack(ChunkBuffer chunkBuffer) {
     synchronized (availableChunkBuffer) {
       chunkBuffer.reset();
-      availableChunkBuffer.push(chunkBuffer);
+      //we use the memtable number * maximal series number in one StroageGroup as the capacity
+      int capacity  =
+          MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups() * IoTDBDescriptor
+              .getInstance().getConfig().getMemtableNumber();
+      if (size > capacity) {
+        size --;
+      } else {
+        availableChunkBuffer.push(chunkBuffer);
+      }
       availableChunkBuffer.notify();
 //      LOGGER.info("a chunk buffer returned, stack size {}", availableChunkBuffer.size());
     }
   }
 
   public void putBack(ChunkBuffer chunkBuffer, String storageGroup) {
-    synchronized (availableChunkBuffer) {
-      chunkBuffer.reset();
-      availableChunkBuffer.push(chunkBuffer);
-      availableChunkBuffer.notify();
-//      LOGGER.info("{} return a chunk buffer, stack size {}", storageGroup, availableChunkBuffer.size());
-    }
+    putBack(chunkBuffer);
   }
 
   public static ChunkBufferPool getInstance() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
index 70e9b17..b0aea4f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.iotdb.db.exception.MetadataArgsErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -378,4 +379,19 @@ public class MGraph implements Serializable {
   public static String combineMetadataInStrings(String[] metadatas) {
     return MTree.combineMetadataInStrings(metadatas);
   }
+
+  /**
+   *
+   * @return storage group name -> the series number
+   */
+  public Map<String, Integer> countSeriesNumberInEachStorageGroup() throws PathErrorException {
+    Map<String, Integer> res = new HashMap<>();
+    Set<String> storageGroups = this.getAllStorageGroup();
+    for (String sg : storageGroups) {
+      this.getNumSchemaMapForOneFileNode(sg);
+      MNode node = mtree.getNodeByPath(sg);
+      res.put(sg, node.getLeafCount());
+    }
+    return res;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 6d6c224..0680025 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -76,6 +76,9 @@ public class MManager {
   private RandomDeleteCache<String, PathCheckRet> checkAndGetDataTypeCache;
   private RandomDeleteCache<String, MNode> mNodeCache;
 
+  private Map<String, Integer> seriesNumberInStorageGroups = new HashMap<>();
+  private int maxSeriesNumberAmongStorageGroup;
+
   private MManager() {
     metadataDirPath = IoTDBDescriptor.getInstance().getConfig().getMetadataDir();
     if (metadataDirPath.length() > 0
@@ -133,14 +136,23 @@ public class MManager {
     lock.writeLock().lock();
     File dataFile = new File(datafilePath);
     File logFile = new File(logFilePath);
+
     try {
       if (dataFile.exists()) {
         initFromDataFile(dataFile);
       } else {
         initFromLog(logFile);
       }
+      seriesNumberInStorageGroups = mgraph.countSeriesNumberInEachStorageGroup();
+      if (seriesNumberInStorageGroups.isEmpty()) {
+        maxSeriesNumberAmongStorageGroup = 0;
+      } else {
+        maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
+            .max(Integer::compareTo).get();
+      }
       logWriter = new BufferedWriter(new FileWriter(logFile, true));
       writeToLog = true;
+
     } catch (PathErrorException | MetadataArgsErrorException
         | ClassNotFoundException | IOException e) {
       mgraph = new MGraph(ROOT_NAME);
@@ -266,6 +278,13 @@ public class MManager {
     lock.writeLock().lock();
     try {
       mgraph.addPathToMTree(path, dataType, encoding, compressor, props);
+      String storageName = mgraph.getFileNameByPath(path);
+      int size = seriesNumberInStorageGroups.get(mgraph.getFileNameByPath(path));
+      seriesNumberInStorageGroups
+          .put(storageName, size + 1);
+      if (size + 1 > maxSeriesNumberAmongStorageGroup) {
+        maxSeriesNumberAmongStorageGroup = size + 1;
+      }
       if (writeToLog) {
         initLogStream();
         logWriter.write(String.format("%s,%s,%s,%s,%s", MetadataOperationType.ADD_PATH_TO_MTREE,
@@ -316,6 +335,20 @@ public class MManager {
         logWriter.newLine();
         logWriter.flush();
       }
+      String storageGroup = getFileNameByPath(path);
+      int size = seriesNumberInStorageGroups.get(storageGroup);
+      seriesNumberInStorageGroups.put(storageGroup, size - 1);
+      if (size == maxSeriesNumberAmongStorageGroup) {
+        //recalculate
+        if (seriesNumberInStorageGroups.isEmpty()) {
+          maxSeriesNumberAmongStorageGroup = 0;
+        } else {
+          maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
+              .max(Integer::compareTo).get();
+        }
+      } else {
+        maxSeriesNumberAmongStorageGroup--;
+      }
       return dataFileName;
     } finally {
       lock.writeLock().unlock();
@@ -332,6 +365,7 @@ public class MManager {
       checkAndGetDataTypeCache.clear();
       mNodeCache.clear();
       mgraph.setStorageLevel(path);
+      seriesNumberInStorageGroups.put(path, 0);
       if (writeToLog) {
         initLogStream();
         logWriter.write(MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE + "," + path);
@@ -1025,6 +1059,10 @@ public class MManager {
     }
   }
 
+  public int getMaximalSeriesNumberAmongStorageGroups() {
+    return maxSeriesNumberAmongStorageGroup;
+  }
+
   private static class MManagerHolder {
     private MManagerHolder(){
       //allowed to do nothing
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 4fbf8bf..61c08d6 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -344,4 +344,28 @@ public class MManagerBasicTest {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testMaximalSeriesNumberAmongStorageGroup() throws IOException, PathErrorException {
+    MManager manager = MManager.getInstance();
+    assertEquals(0, manager.getMaximalSeriesNumberAmongStorageGroups());
+    manager.setStorageLevelToMTree("root.laptop");
+    assertEquals(0, manager.getMaximalSeriesNumberAmongStorageGroups());
+    manager.addPathToMTree("root.laptop.d1.s1", TSDataType.INT32, TSEncoding.PLAIN,
+        CompressionType.GZIP, null);
+    manager.addPathToMTree("root.laptop.d1.s2", TSDataType.INT32, TSEncoding.PLAIN,
+        CompressionType.GZIP, null);
+    assertEquals(2, manager.getMaximalSeriesNumberAmongStorageGroups());
+    manager.setStorageLevelToMTree("root.vehicle");
+    manager.addPathToMTree("root.vehicle.d1.s1", TSDataType.INT32, TSEncoding.PLAIN,
+        CompressionType.GZIP, null);
+    assertEquals(2, manager.getMaximalSeriesNumberAmongStorageGroups());
+
+    manager.deletePathFromMTree("root.laptop.d1.s1");
+    assertEquals(1, manager.getMaximalSeriesNumberAmongStorageGroups());
+    manager.deletePathFromMTree("root.laptop.d1.s2");
+    assertEquals(1, manager.getMaximalSeriesNumberAmongStorageGroups());
+
+
+  }
 }