You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2020/01/13 07:35:31 UTC

[incubator-iotdb] 01/02: fix bugs

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch nvmlogging
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 85da2abaea6ee98dbe67951a354e552d10b38970
Author: mdf369 <95...@qq.com>
AuthorDate: Fri Jan 10 10:04:56 2020 +0800

    fix bugs
---
 .../engine/storagegroup/StorageGroupProcessor.java |  3 +++
 .../java/org/apache/iotdb/db/nvm/PerfMonitor.java  |  2 +-
 .../nvm/metadata/{SpaceCount.java => Counter.java} | 14 ++++++++---
 .../nvm/recover/NVMMemtableRecoverPerformer.java   |  6 +++++
 .../iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java |  7 ++----
 .../apache/iotdb/db/nvm/space/NVMDataSpace.java    | 19 ++++++++-------
 .../apache/iotdb/db/nvm/space/NVMSpaceManager.java | 27 +++++++++++++++-------
 .../db/nvm/space/NVMSpaceMetadataManager.java      | 25 ++++++++------------
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  6 +++++
 .../db/utils/datastructure/AbstractTVList.java     |  2 +-
 .../iotdb/db/utils/datastructure/NVMTVList.java    | 20 ++++++++++++++--
 .../writelog/recover/TsFileRecoverPerformer.java   |  6 +++++
 12 files changed, 91 insertions(+), 46 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index fc80ba8..49da491 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.nvm.PerfMonitor;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -207,7 +208,9 @@ public class StorageGroupProcessor {
       throw new StorageGroupProcessorException(e);
     }
 
+    long time = System.currentTimeMillis();
     recover();
+    PerfMonitor.add("StoragaGroupProcessor.recocer", System.currentTimeMillis() - time);
   }
 
   private void recover() throws StorageGroupProcessorException {
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/PerfMonitor.java b/server/src/main/java/org/apache/iotdb/db/nvm/PerfMonitor.java
index 5a4632b..6784e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/PerfMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/PerfMonitor.java
@@ -39,7 +39,7 @@ public class PerfMonitor {
 
     @Override
     public String toString() {
-      return name + ":\t" + timeLen + "\t" + count;
+      return name + ":\t\t\t" + timeLen + "\t\t\t" + count;
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceCount.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/Counter.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceCount.java
rename to server/src/main/java/org/apache/iotdb/db/nvm/metadata/Counter.java
index 951a3e6..da9edb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceCount.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/Counter.java
@@ -2,12 +2,20 @@ package org.apache.iotdb.db.nvm.metadata;
 
 import java.io.IOException;
 
-public class SpaceCount extends NVMSpaceMetadata {
+public class Counter extends NVMSpaceMetadata {
 
-  public SpaceCount() throws IOException {
+  public Counter() throws IOException {
   }
 
-  public void put(int v) {
+  public void increment() {
+    put(get() + 1);
+  }
+
+  public void decrement() {
+    put(get() - 1);
+  }
+
+  private void put(int v) {
     space.putInt(0, v);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
index 73c51b4..8829f9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
@@ -13,9 +13,13 @@ import org.apache.iotdb.db.nvm.space.NVMDataSpace;
 import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
 import org.apache.iotdb.db.nvm.space.NVMSpaceMetadataManager;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NVMMemtableRecoverPerformer {
 
+  private static final Logger logger = LoggerFactory.getLogger(NVMMemtableRecoverPerformer.class);
+
   private final static NVMMemtableRecoverPerformer INSTANCE = new NVMMemtableRecoverPerformer();
 
   private NVMSpaceManager spaceManager = NVMSpaceManager.getInstance();
@@ -44,6 +48,8 @@ public class NVMMemtableRecoverPerformer {
       throws IOException {
     Map<String, Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>>> dataMap = new HashMap<>();
     List<Integer> validTimeSpaceIndexList = metadataManager.getValidTimeSpaceIndexList();
+    logger.debug("Valid time space index num: {}", validTimeSpaceIndexList.size());
+
     for (Integer timeSpaceIndex : validTimeSpaceIndexList) {
       int valueSpaceIndex = metadataManager.getValueSpaceIndexByTimeSpaceIndex(timeSpaceIndex);
       NVMDataSpace timeSpace = spaceManager.getNVMDataSpaceByIndex(timeSpaceIndex);
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
index 8bf3e2e..aca3c81 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
@@ -34,14 +34,14 @@ public class NVMPrimitiveArrayPool {
 
   private NVMPrimitiveArrayPool() {}
 
-  public synchronized NVMDataSpace getPrimitiveDataListByType(TSDataType dataType) {
+  public synchronized NVMDataSpace getPrimitiveDataListByType(TSDataType dataType, boolean isTime) {
     long time = System.currentTimeMillis();
     ArrayDeque<NVMDataSpace> dataListQueue = primitiveArraysMap.computeIfAbsent(dataType, k ->new ArrayDeque<>());
     NVMDataSpace nvmSpace = dataListQueue.poll();
 
     long size = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
     if (nvmSpace == null) {
-      nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size * ARRAY_SIZE, dataType);
+      nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size * ARRAY_SIZE, dataType, isTime);
     }
 
     PerfMonitor.add("NVM.getDataList", System.currentTimeMillis() - time);
@@ -49,10 +49,7 @@ public class NVMPrimitiveArrayPool {
   }
 
   public synchronized void release(NVMDataSpace nvmSpace, TSDataType dataType) {
-    // TODO freeslotmap?
-
     primitiveArraysMap.get(dataType).add(nvmSpace);
-    NVMSpaceMetadataManager.getInstance().unregisterSpace(nvmSpace);
   }
 
 //  /**
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
index b600e87..d515593 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
@@ -11,36 +11,35 @@ public class NVMDataSpace extends NVMSpace {
   private TSDataType dataType;
   private int unitSize;
 
-  NVMDataSpace(long offset, long size, ByteBuffer byteBuffer, int index, TSDataType dataType) {
+  NVMDataSpace(long offset, long size, ByteBuffer byteBuffer, int index, TSDataType dataType, boolean refresh) {
     super(offset, size, byteBuffer);
     this.index = index;
     this.dataType = dataType;
     unitSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
+
+    if (refresh) {
+      refreshData();
+    }
   }
 
   public int getUnitNum() {
     return (int) (size / unitSize);
   }
   
-  public void refreshData() {
-    // TODO only for Long
+  private void refreshData() {
     for (int i = 0; i < size / NVMSpaceManager.getPrimitiveTypeByteSize(TSDataType.INT64); i++) {
-      byteBuffer.putLong(i, INVALID_VALUE);
+      setData(i, INVALID_VALUE);
     }
   }
 
-  /**
-   * for Long only
-   * @return
-   */
   public int getValidUnitNum() {
+    // TODO only for time
     int count = 0;
     while (true) {
-      long v = byteBuffer.getLong(count);
+      long v = (long) getData(count);
       if (v == INVALID_VALUE) {
         break;
       }
-
       count++;
     }
     return count;
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
index 119881d..d29a62f 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
@@ -59,22 +59,20 @@ public class NVMSpaceManager {
   }
 
   public synchronized NVMSpace allocateSpace(long size) throws IOException {
-    logger.debug("Try to allocate NVMSpace from {} to {}", curOffset, curOffset + size);
+    logger.trace("Try to allocate NVMSpace from {} to {}", curOffset, curOffset + size);
     NVMSpace nvmSpace = new NVMSpace(curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size));
     curOffset += size;
     return nvmSpace;
   }
 
-  public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType dataType) {
+  public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType dataType, boolean isTime) {
     checkIsFull();
 
     try {
-      logger.debug("Try to allocate NVMDataSpace from {} to {}", curOffset, curOffset + size);
+      logger.trace("Try to allocate NVMDataSpace from {} to {}", curOffset, curOffset + size);
       int index = curDataSpaceIndex.getAndIncrement();
       NVMDataSpace nvmSpace = new NVMDataSpace(
-          curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), index, dataType);
-      nvmSpace.refreshData();
-      metadataManager.updateCount(curDataSpaceIndex.get());
+          curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), index, dataType, isTime);
       curOffset += size;
       return nvmSpace;
     } catch (IOException e) {
@@ -101,8 +99,8 @@ public class NVMSpaceManager {
   }
 
   private synchronized NVMDataSpace recoverData(long offset, long size, int index, TSDataType dataType) throws IOException {
-    logger.debug("Try to recover NVMSpace from {} to {}", offset, offset + size);
-    NVMDataSpace nvmSpace = new NVMDataSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset, size), index, dataType);
+    logger.trace("Try to recover NVMSpace from {} to {}", offset, offset + size);
+    NVMDataSpace nvmSpace = new NVMDataSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset, size), index, dataType, false);
     return nvmSpace;
   }
 
@@ -136,4 +134,17 @@ public class NVMSpaceManager {
     }
     return size;
   }
+
+  public static void main(String[] args) throws IOException {
+    String nvmDir = IoTDBDescriptor.getInstance().getConfig().getNvmDir();
+    String nvmFilePath = nvmDir + File.separatorChar + NVM_FILE_NAME;
+    File nvmDirFile = FSFactoryProducer.getFSFactory().getFile(nvmDir);
+    nvmDirFile.mkdirs();
+    FileChannel nvmFileChannel = new RandomAccessFile(nvmFilePath, "rw").getChannel();
+
+    ByteBuffer byteBuffer = nvmFileChannel.map(MapMode.READ_WRITE, 0, 4);
+    for (int i = 0; i < 1; i++) {
+      System.out.println(byteBuffer.getInt(i));
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
index d6d9d02..0373433 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
@@ -1,12 +1,10 @@
 package org.apache.iotdb.db.nvm.space;
 
-import static org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSPACE_NUM_MAX;
-
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.nvm.metadata.DataTypeMemo;
 import org.apache.iotdb.db.nvm.metadata.OffsetMemo;
-import org.apache.iotdb.db.nvm.metadata.SpaceCount;
+import org.apache.iotdb.db.nvm.metadata.Counter;
 import org.apache.iotdb.db.nvm.metadata.SpaceStatusBitMap;
 import org.apache.iotdb.db.nvm.metadata.TimeValueMapper;
 import org.apache.iotdb.db.nvm.metadata.TimeseriesTimeIndexMapper;
@@ -16,7 +14,7 @@ public class NVMSpaceMetadataManager {
 
   private final static NVMSpaceMetadataManager INSTANCE = new NVMSpaceMetadataManager();
 
-  private SpaceCount spaceCount;
+  private Counter validTimeSpaceCounter;
   private SpaceStatusBitMap spaceStatusBitMap;
   private OffsetMemo offsetMemo;
   private DataTypeMemo dataTypeMemo;
@@ -26,7 +24,7 @@ public class NVMSpaceMetadataManager {
   private NVMSpaceMetadataManager() {}
 
   public void init() throws IOException {
-    spaceCount = new SpaceCount();
+    validTimeSpaceCounter = new Counter();
     spaceStatusBitMap = new SpaceStatusBitMap();
     offsetMemo = new OffsetMemo();
     dataTypeMemo = new DataTypeMemo();
@@ -38,17 +36,10 @@ public class NVMSpaceMetadataManager {
     return INSTANCE;
   }
 
-  public void updateCount(int v) {
-    spaceCount.put(v);
-  }
-
-  public int getCount() {
-    return spaceCount.get();
-  }
-
   public void registerTVSpace(NVMDataSpace timeSpace, NVMDataSpace valueSpace, String sgId, String deviceId, String measurementId) {
     int timeSpaceIndex = timeSpace.getIndex();
     int valueSpaceIndex = valueSpace.getIndex();
+    validTimeSpaceCounter.increment();
     spaceStatusBitMap.setUse(timeSpaceIndex, true);
     spaceStatusBitMap.setUse(valueSpaceIndex, false);
     offsetMemo.set(timeSpaceIndex, timeSpace.getOffset());
@@ -61,12 +52,14 @@ public class NVMSpaceMetadataManager {
         .mapTimeIndexToTimeSeries(timeSpaceIndex, sgId, deviceId, measurementId);
   }
 
-  public void unregisterSpace(NVMDataSpace space) {
-    spaceStatusBitMap.setFree(space.getIndex());
+  public void unregisterTVSpace(NVMDataSpace timeSpace, NVMDataSpace valueSpace) {
+    validTimeSpaceCounter.decrement();
+    spaceStatusBitMap.setFree(timeSpace.getIndex());
+    spaceStatusBitMap.setFree(valueSpace.getIndex());
   }
 
   public List<Integer> getValidTimeSpaceIndexList() {
-    return spaceStatusBitMap.getValidTimeSpaceIndexList(getCount() / 2);
+    return spaceStatusBitMap.getValidTimeSpaceIndexList(validTimeSpaceCounter.get());
   }
 
   public int getValueSpaceIndexByTimeSpaceIndex(int timeSpaceIndex) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 2628ba1..c2611b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.nvm.PerfMonitor;
 import org.apache.iotdb.db.nvm.recover.NVMMemtableRecoverPerformer;
 import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
 import org.apache.iotdb.db.rescon.TVListAllocator;
@@ -85,8 +86,13 @@ public class IoTDB implements IoTDBMBean {
     setUncaughtExceptionHandler();
 
     if (IoTDBDescriptor.getInstance().getConfig().isEnableNVM()) {
+      long time = System.currentTimeMillis();
       NVMSpaceManager.getInstance().init();
+      PerfMonitor.add("NVMSpaceManager.init", System.currentTimeMillis() - time);
+
+      time = System.currentTimeMillis();
       NVMMemtableRecoverPerformer.getInstance().init();
+      PerfMonitor.add("NVMMemtableRecoverPerformer.init", System.currentTimeMillis() - time);
     }
 
     initMManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
index 14c5b64..f040090 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
@@ -16,7 +16,7 @@ public abstract class AbstractTVList {
   /**
    * this field is effective only in the Tvlist in a RealOnlyMemChunk.
    */
-  private long timeOffset = Long.MIN_VALUE;
+  protected long timeOffset = Long.MIN_VALUE;
 
   protected long pivotTime;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
index 0eda8b8..a71bd29 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
@@ -54,6 +54,22 @@ public abstract class NVMTVList extends AbstractTVList {
   }
 
   @Override
+  public void clear() {
+    size = 0;
+    timeOffset = Long.MIN_VALUE;
+    sorted = true;
+    minTime = Long.MIN_VALUE;
+
+    for (int i = 0; i < timestamps.size(); i++) {
+      NVMSpaceMetadataManager.getInstance().unregisterTVSpace(timestamps.get(i), values.get(i));
+    }
+    clearTime();
+    clearSortedTime();
+    clearValue();
+    clearSortedValue();
+  }
+
+  @Override
   protected void clearValue() {
     if (values != null) {
       for (NVMDataSpace valueSpace : values) {
@@ -66,7 +82,7 @@ public abstract class NVMTVList extends AbstractTVList {
   @Override
   protected NVMDataSpace expandValues() {
     NVMDataSpace dataSpace = NVMPrimitiveArrayPool
-        .getInstance().getPrimitiveDataListByType(dataType);
+        .getInstance().getPrimitiveDataListByType(dataType, false);
     values.add(dataSpace);
     return dataSpace;
   }
@@ -139,7 +155,7 @@ public abstract class NVMTVList extends AbstractTVList {
   protected void checkExpansion() {
     if ((size % ARRAY_SIZE) == 0) {
       NVMDataSpace valueSpace = expandValues();
-      NVMDataSpace timeSpace = NVMPrimitiveArrayPool.getInstance().getPrimitiveDataListByType(TSDataType.INT64);
+      NVMDataSpace timeSpace = NVMPrimitiveArrayPool.getInstance().getPrimitiveDataListByType(TSDataType.INT64, true);
       timestamps.add(timeSpace);
       NVMSpaceMetadataManager.getInstance().registerTVSpace(timeSpace, valueSpace, sgId, deviceId, measurementId);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 8d86590..70d6a1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
+import org.apache.iotdb.db.nvm.PerfMonitor;
 import org.apache.iotdb.db.nvm.memtable.NVMPrimitiveMemTable;
 import org.apache.iotdb.db.nvm.recover.NVMMemtableRecoverPerformer;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -145,9 +146,13 @@ public class TsFileRecoverPerformer {
     }
 
     // recover data in memory
+    long time;
     if (IoTDBDescriptor.getInstance().getConfig().isEnableNVM()) {
+      time = System.currentTimeMillis();
       reloadNVMData(restorableTsFileIOWriter);
+      PerfMonitor.add("TsFileRecoverPerformer.reloadNVMData", System.currentTimeMillis() - time);
     } else {
+      time = System.currentTimeMillis();
       redoLogs(restorableTsFileIOWriter);
 
       // clean logs
@@ -158,6 +163,7 @@ public class TsFileRecoverPerformer {
       } catch (IOException e) {
         throw new StorageGroupProcessorException(e);
       }
+      PerfMonitor.add("TsFileRecoverPerformer.redoAndCleanLogs", System.currentTimeMillis() - time);
     }
   }