You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2020/01/02 09:01:42 UTC
[incubator-iotdb] branch nvmlogging updated: redo clone and add sg,
device, measurement in NVMTVList
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch nvmlogging
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/nvmlogging by this push:
new 1d50933 redo clone and add sg, device, measurement in NVMTVList
1d50933 is described below
commit 1d50933ea50603b48a2042caccefc65e0023c9a6
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Jan 2 17:01:18 2020 +0800
redo clone and add sg, device, measurement in NVMTVList
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 2 +-
.../iotdb/db/engine/flush/NotifyFlushMemTable.java | 6 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 16 +-
.../db/engine/memtable/IWritableMemChunk.java | 2 +-
.../db/engine/memtable/PrimitiveMemTable.java | 11 +-
.../engine/storagegroup/StorageGroupProcessor.java | 4 +-
.../db/engine/storagegroup/TsFileProcessor.java | 8 +-
.../db/nvm/memtable/NVMPrimitiveMemTable.java | 15 +-
.../iotdb/db/nvm/memtable/NVMWritableMemChunk.java | 3 +-
.../apache/iotdb/db/nvm/metadata/DataTypeMemo.java | 26 +++
.../iotdb/db/nvm/metadata/FreeSpaceBitMap.java | 27 +++
.../iotdb/db/nvm/metadata/NVMSpaceMetadata.java | 12 ++
.../apache/iotdb/db/nvm/metadata/TSDataMap.java | 23 +++
.../iotdb/db/nvm/metadata/TimeValueMapper.java | 15 ++
.../nvm/recover/NVMMemtableRecoverPerformer.java | 49 ++++++
.../iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java | 43 +++--
.../iotdb/db/nvm/space/INVMSpaceManager.java | 9 -
.../apache/iotdb/db/nvm/space/NVMDataSpace.java | 99 +++++++++++
.../org/apache/iotdb/db/nvm/space/NVMSpace.java | 28 +++
.../apache/iotdb/db/nvm/space/NVMSpaceManager.java | 188 +++++++++------------
.../org/apache/iotdb/db/rescon/MemTablePool.java | 6 +-
.../apache/iotdb/db/rescon/TVListAllocator.java | 20 ++-
.../datastructure/AbstractTVList.java | 5 +-
.../iotdb/db/utils/datastructure/BinaryTVList.java | 8 +-
.../db/utils/datastructure/BooleanTVList.java | 12 +-
.../iotdb/db/utils/datastructure/DoubleTVList.java | 12 +-
.../iotdb/db/utils/datastructure/FloatTVList.java | 12 +-
.../iotdb/db/utils/datastructure/IntTVList.java | 12 +-
.../iotdb/db/utils/datastructure/LongTVList.java | 12 +-
.../datastructure/NVMBooleanTVList.java | 20 +--
.../datastructure/NVMDoubleTVList.java | 16 +-
.../datastructure/NVMFloatTVList.java | 16 +-
.../{nvm => utils}/datastructure/NVMIntTVList.java | 16 +-
.../datastructure/NVMLongTVList.java | 16 +-
.../db/{nvm => utils}/datastructure/NVMTVList.java | 61 ++++---
.../iotdb/db/utils/datastructure/TVList.java | 1 -
.../writelog/recover/TsFileRecoverPerformer.java | 11 +-
37 files changed, 571 insertions(+), 271 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 0f39cac..8873dc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
-import org.apache.iotdb.db.nvm.datastructure.AbstractTVList;
+import org.apache.iotdb.db.utils.datastructure.AbstractTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 099a496..50f27ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -29,8 +29,12 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
*/
public class NotifyFlushMemTable extends AbstractMemTable {
+ public NotifyFlushMemTable(String sgId) {
+ super(sgId);
+ }
+
@Override
- protected IWritableMemChunk genMemSeries(TSDataType dataType) {
+ protected IWritableMemChunk genMemSeries(String deviceId, String measurementId, TSDataType dataType) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 08619ea..f9d88af 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -46,12 +47,15 @@ public abstract class AbstractMemTable implements IMemTable {
private long memSize = 0;
- public AbstractMemTable() {
- this.memTableMap = new HashMap<>();
+ protected String storageGroupId;
+
+ public AbstractMemTable(String sgId) {
+ this(new HashMap<>(), sgId);
}
- public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+ public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap, String sgId) {
this.memTableMap = memTableMap;
+ this.storageGroupId = sgId;
}
@Override
@@ -68,19 +72,19 @@ public abstract class AbstractMemTable implements IMemTable {
return memTableMap.containsKey(deviceId) && memTableMap.get(deviceId).containsKey(measurement);
}
- private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measurement,
+ protected IWritableMemChunk createIfNotExistAndGet(String deviceId, String measurement,
TSDataType dataType) {
if (!memTableMap.containsKey(deviceId)) {
memTableMap.put(deviceId, new HashMap<>());
}
Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
if (!memSeries.containsKey(measurement)) {
- memSeries.put(measurement, genMemSeries(dataType));
+ memSeries.put(measurement, genMemSeries(deviceId, measurement, dataType));
}
return memSeries.get(measurement);
}
- protected abstract IWritableMemChunk genMemSeries(TSDataType dataType);
+ protected abstract IWritableMemChunk genMemSeries(String deviceId, String measurementId, TSDataType dataType);
@Override
public void insert(InsertPlan insertPlan) throws QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 0d9b183..a3b281e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.engine.memtable;
import java.util.List;
-import org.apache.iotdb.db.nvm.datastructure.AbstractTVList;
+import org.apache.iotdb.db.utils.datastructure.AbstractTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index d21b889..003d51c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -29,15 +29,16 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class PrimitiveMemTable extends AbstractMemTable {
- public PrimitiveMemTable() {
+ public PrimitiveMemTable(String sgId) {
+ super(sgId);
}
- public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
- super(memTableMap);
+ public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap, String sgId) {
+ super(memTableMap, sgId);
}
@Override
- protected IWritableMemChunk genMemSeries(TSDataType dataType) {
+ protected IWritableMemChunk genMemSeries(String deviceId, String measurementId, TSDataType dataType) {
return new WritableMemChunk(dataType,
(TVList) TVListAllocator.getInstance().allocate(dataType, false));
}
@@ -46,7 +47,7 @@ public class PrimitiveMemTable extends AbstractMemTable {
public IMemTable copy() {
Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
- return new PrimitiveMemTable(newMap);
+ return new PrimitiveMemTable(newMap, storageGroupId);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2cf9e8f..fc80ba8 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -290,7 +290,7 @@ public class StorageGroupProcessor {
for (TsFileResource tsFileResource : tsFiles) {
sequenceFileList.add(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
- , schema, versionController, tsFileResource, false);
+ , schema, versionController, tsFileResource, false, storageGroupName);
recoverPerformer.recover();
tsFileResource.setClosed(true);
}
@@ -302,7 +302,7 @@ public class StorageGroupProcessor {
unSequenceFileList.add(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
schema,
- versionController, tsFileResource, true);
+ versionController, tsFileResource, true, storageGroupName);
recoverPerformer.recover();
tsFileResource.setClosed(true);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1c02aad..93759d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -144,7 +144,7 @@ public class TsFileProcessor {
public boolean insert(InsertPlan insertPlan) throws QueryProcessException {
if (workMemTable == null) {
- workMemTable = MemTablePool.getInstance().getAvailableMemTable(this, useNVM);
+ workMemTable = MemTablePool.getInstance().getAvailableMemTable(this, useNVM, storageGroupName);
}
// insert insertPlan to the work memtable
@@ -174,7 +174,7 @@ public class TsFileProcessor {
Integer[] results) throws QueryProcessException {
if (workMemTable == null) {
- workMemTable = MemTablePool.getInstance().getAvailableMemTable(this, useNVM);
+ workMemTable = MemTablePool.getInstance().getAvailableMemTable(this, useNVM, storageGroupName);
}
// insert insertPlan to the work memtable
@@ -297,7 +297,7 @@ public class TsFileProcessor {
// To ensure there must be a flush thread serving this processor after the field `shouldClose`
// is set true, we need to generate a NotifyFlushMemTable as a signal task and submit it to
// the FlushManager.
- IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+ IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable(storageGroupName) : workMemTable;
if (logger.isDebugEnabled()) {
if (tmpMemTable.isSignalMemTable()) {
logger.debug(
@@ -325,7 +325,7 @@ public class TsFileProcessor {
IMemTable tmpMemTable;
flushQueryLock.writeLock().lock();
try {
- tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+ tmpMemTable = workMemTable == null ? new NotifyFlushMemTable(storageGroupName) : workMemTable;
if (tmpMemTable.isSignalMemTable()) {
logger.debug("add a signal memtable into flushing memtable list when sync flush");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
index bce7fad..d4d583f 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
@@ -7,30 +7,31 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.nvm.datastructure.NVMTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMTVList;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class NVMPrimitiveMemTable extends AbstractMemTable {
- public NVMPrimitiveMemTable() {
+ public NVMPrimitiveMemTable(String sgId) {
+ super(sgId);
}
- public NVMPrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
- super(memTableMap);
+ public NVMPrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap, String sgId) {
+ super(memTableMap, sgId);
}
@Override
- protected IWritableMemChunk genMemSeries(TSDataType dataType) {
+ protected IWritableMemChunk genMemSeries(String deviceId, String measurementId, TSDataType dataType) {
return new NVMWritableMemChunk(dataType,
- (NVMTVList) TVListAllocator.getInstance().allocate(dataType, true));
+ (NVMTVList) TVListAllocator.getInstance().allocate(storageGroupId, deviceId, measurementId, dataType, true));
}
@Override
public IMemTable copy() {
Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
- return new NVMPrimitiveMemTable(newMap);
+ return new NVMPrimitiveMemTable(newMap, storageGroupId);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java
index 4c7f287..5e5092c 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java
@@ -3,7 +3,7 @@ package org.apache.iotdb.db.nvm.memtable;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.db.nvm.datastructure.NVMTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMTVList;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType.TsBinary;
import org.apache.iotdb.db.utils.TsPrimitiveType.TsBoolean;
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
public class NVMWritableMemChunk implements IWritableMemChunk {
private static final Logger logger = LoggerFactory.getLogger(NVMWritableMemChunk.class);
+
private TSDataType dataType;
private NVMTVList list;
private List<TimeValuePair> sortedList;
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
new file mode 100644
index 0000000..97cbe79
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class DataTypeMemo extends NVMSpaceMetadata {
+
+ public DataTypeMemo(ByteBuffer byteBuffer) {
+ super(byteBuffer);
+ }
+
+ public void set(int index, TSDataType dataType) {
+ byteBuffer.putShort(index, dataType.serialize());
+ }
+
+ public List<TSDataType> getDataTypeList() {
+ List<TSDataType> dataTypeList = new ArrayList<>();
+ for (int i = 0; i < byteBuffer.capacity(); i++) {
+ TSDataType dataType = TSDataType.deserialize(byteBuffer.getShort(i));
+ dataTypeList.add(dataType);
+ }
+ return dataTypeList;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
new file mode 100644
index 0000000..2b7967b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
@@ -0,0 +1,27 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+
+public class FreeSpaceBitMap extends NVMSpaceMetadata {
+
+ public FreeSpaceBitMap(ByteBuffer byteBuffer) {
+ super(byteBuffer);
+ }
+
+ public void update(int index, boolean setFree) {
+ byteBuffer.put(index, setFree ? (byte) 0 : (byte) 1);
+ }
+
+ public Set<Integer> getValidSpaceIndexSet() {
+ Set<Integer> freeSpaceIndexList = new HashSet<>();
+ for (int i = 0; i < byteBuffer.capacity(); i++) {
+ byte flag = byteBuffer.get(i);
+ if (flag == 1) {
+ freeSpaceIndexList.add(i);
+ }
+ }
+ return freeSpaceIndexList;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/NVMSpaceMetadata.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/NVMSpaceMetadata.java
new file mode 100644
index 0000000..05c88a7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/NVMSpaceMetadata.java
@@ -0,0 +1,12 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import java.nio.ByteBuffer;
+
+public abstract class NVMSpaceMetadata {
+
+ protected ByteBuffer byteBuffer;
+
+ public NVMSpaceMetadata(ByteBuffer byteBuffer) {
+ this.byteBuffer = byteBuffer;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TSDataMap.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TSDataMap.java
new file mode 100644
index 0000000..d93e73b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TSDataMap.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class TSDataMap extends NVMSpaceMetadata {
+
+ public TSDataMap(ByteBuffer byteBuffer) {
+ super(byteBuffer);
+ }
+
+ public void addSpaceToTimeSeries(String sgId, String deviceId, String measurementId,
+ int timeSpaceIndex,
+ int valueSpaceIndex) {
+
+ }
+
+ public Map<String, Map<String, Map<String, List<Pair<Integer, Integer>>>>> generateTSPathTVPairListMap() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeValueMapper.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeValueMapper.java
new file mode 100644
index 0000000..7f3f057
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeValueMapper.java
@@ -0,0 +1,15 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import java.nio.ByteBuffer;
+
+public class TimeValueMapper extends NVMSpaceMetadata {
+
+ public TimeValueMapper(ByteBuffer byteBuffer) {
+ super(byteBuffer);
+ }
+
+ public void map(int timeSpaceIndex, int valueSpaceIndex) {
+ byteBuffer.putInt(timeSpaceIndex);
+ byteBuffer.putInt(valueSpaceIndex);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
new file mode 100644
index 0000000..afc33f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
@@ -0,0 +1,49 @@
+package org.apache.iotdb.db.nvm.recover;
+
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.nvm.metadata.DataTypeMemo;
+import org.apache.iotdb.db.nvm.metadata.FreeSpaceBitMap;
+import org.apache.iotdb.db.nvm.metadata.TSDataMap;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class NVMMemtableRecoverPerformer {
+
+ private FileChannel nvmFileChannel;
+ private final MapMode MAP_MODE = MapMode.READ_WRITE;
+
+ /**
+ * metadata fields
+ */
+ private FreeSpaceBitMap freeSpaceBitMap;
+ private DataTypeMemo dataTypeMemo;
+ private TSDataMap tsDataMap;
+
+ public Map<String, Map<String, Map<String, List<Pair<Integer, Integer>>>>> getValidTSPathTVPairListMap() {
+ Set<Integer> validSpaceIndexSet = freeSpaceBitMap.getValidSpaceIndexSet();
+ Map<String, Map<String, Map<String, List<Pair<Integer, Integer>>>>> tsTVMap = tsDataMap.generateTSPathTVPairListMap();
+ for (Map<String, Map<String, List<Pair<Integer, Integer>>>> dmTVMap : tsTVMap.values()) {
+ for (Map<String, List<Pair<Integer, Integer>>> mTVMap : dmTVMap.values()) {
+ for (List<Pair<Integer, Integer>> tvList : mTVMap.values()) {
+ Iterator<Pair<Integer, Integer>> iterator = tvList.iterator();
+ while (iterator.hasNext()) {
+ Pair<Integer, Integer> tvPair = iterator.next();
+ if (!validSpaceIndexSet.contains(tvPair.left) || !validSpaceIndexSet.contains(tvPair.right)) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+ }
+ return tsTVMap;
+ }
+
+ public void reconstructMemtable(String sgId, PrimitiveMemTable memTable) {
+
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
index a9782ab..671822a 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
@@ -3,9 +3,8 @@ package org.apache.iotdb.db.nvm.rescon;
import java.util.ArrayDeque;
import java.util.EnumMap;
import org.apache.iotdb.db.nvm.PerfMonitor;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class NVMPrimitiveArrayPool {
@@ -13,7 +12,7 @@ public class NVMPrimitiveArrayPool {
/**
* data type -> Array<PrimitiveArray>
*/
- private static final EnumMap<TSDataType, ArrayDeque<NVMSpace>> primitiveArraysMap = new EnumMap<>(TSDataType.class);
+ private static final EnumMap<TSDataType, ArrayDeque<NVMDataSpace>> primitiveArraysMap = new EnumMap<>(TSDataType.class);
public static final int ARRAY_SIZE = 128;
@@ -32,40 +31,40 @@ public class NVMPrimitiveArrayPool {
private static final NVMPrimitiveArrayPool INSTANCE = new NVMPrimitiveArrayPool();
-
private NVMPrimitiveArrayPool() {}
- public synchronized NVMSpace getPrimitiveDataListByType(TSDataType dataType) {
+ public synchronized NVMDataSpace getPrimitiveDataListByType(TSDataType dataType) {
long time = System.currentTimeMillis();
- ArrayDeque<NVMSpace> dataListQueue = primitiveArraysMap.computeIfAbsent(dataType, k ->new ArrayDeque<>());
- NVMSpace nvmSpace = dataListQueue.poll();
+ ArrayDeque<NVMDataSpace> dataListQueue = primitiveArraysMap.computeIfAbsent(dataType, k ->new ArrayDeque<>());
+ NVMDataSpace nvmSpace = dataListQueue.poll();
long size = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
if (nvmSpace == null) {
- nvmSpace = NVMSpaceManager.getInstance().allocate(size * ARRAY_SIZE, dataType);
+ nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size * ARRAY_SIZE, dataType);
}
+ NVMSpaceManager.getInstance().registerNVMDataSpace(nvmSpace);
PerfMonitor.add("NVM.getDataList", System.currentTimeMillis() - time);
return nvmSpace;
}
-
- public synchronized void release(NVMSpace nvmSpace, TSDataType dataType) {
+ public synchronized void release(NVMDataSpace nvmSpace, TSDataType dataType) {
// TODO freeslotmap?
primitiveArraysMap.get(dataType).add(nvmSpace);
+ NVMSpaceManager.getInstance().unregisterNVMDataSpace(nvmSpace);
}
- /**
- * @param size needed capacity
- * @return an array of primitive data arrays
- */
- public synchronized NVMSpace[] getDataListsByType(TSDataType dataType, int size) {
- int arrayNumber = (int) Math.ceil((float) size / (float) ARRAY_SIZE);
- NVMSpace[] nvmSpaces = new NVMSpace[arrayNumber];
- for (int i = 0; i < arrayNumber; i++) {
- nvmSpaces[i] = getPrimitiveDataListByType(dataType);
- }
- return nvmSpaces;
- }
+// /**
+// * @param size needed capacity
+// * @return an array of primitive data arrays
+// */
+// public synchronized NVMDataSpace[] getDataListsByType(TSDataType dataType, int size) {
+// int arrayNumber = (int) Math.ceil((float) size / (float) ARRAY_SIZE);
+// NVMDataSpace[] nvmSpaces = new NVMDataSpace[arrayNumber];
+// for (int i = 0; i < arrayNumber; i++) {
+// nvmSpaces[i] = getPrimitiveDataListByType(dataType);
+// }
+// return nvmSpaces;
+// }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/INVMSpaceManager.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/INVMSpaceManager.java
deleted file mode 100644
index de56fa4..0000000
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/INVMSpaceManager.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.iotdb.db.nvm.space;
-
-import java.io.IOException;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
-
-public interface INVMSpaceManager {
-
- NVMSpace allocate(long size) throws IOException;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
new file mode 100644
index 0000000..2ed0229
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
@@ -0,0 +1,99 @@
+package org.apache.iotdb.db.nvm.space;
+
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class NVMDataSpace extends NVMSpace {
+
+ private int index;
+ private TSDataType dataType;
+
+ NVMDataSpace(long offset, long size, ByteBuffer byteBuffer, int index, TSDataType dataType) {
+ super(offset, size, byteBuffer);
+ this.index = index;
+ this.dataType = dataType;
+ }
+
+ @Override
+ public NVMDataSpace clone() {
+ return new NVMDataSpace(offset, size, cloneByteBuffer(), index, dataType);
+ }
+
+ public ByteBuffer cloneByteBuffer() {
+ ByteBuffer clone = ByteBuffer.allocate(byteBuffer.capacity());
+ byteBuffer.rewind();
+ clone.put(byteBuffer);
+ byteBuffer.rewind();
+ clone.flip();
+ return clone;
+ }
+
+ public Object get(int index) {
+ int objectSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
+ index *= objectSize;
+ Object object = null;
+ switch (dataType) {
+ case BOOLEAN:
+ object = byteBuffer.get(index);
+ break;
+ case INT32:
+ object = byteBuffer.getInt(index);
+ break;
+ case INT64:
+ object = byteBuffer.getLong(index);
+ break;
+ case FLOAT:
+ object = byteBuffer.getFloat(index);
+ break;
+ case DOUBLE:
+ object = byteBuffer.getDouble(index);
+ break;
+ case TEXT:
+ // TODO
+ break;
+ }
+ return object;
+ }
+
+ public void set(int index, Object object) {
+ int objectSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
+ index *= objectSize;
+ switch (dataType) {
+ case BOOLEAN:
+ byteBuffer.put(index, (byte) object);
+ break;
+ case INT32:
+ byteBuffer.putInt(index, (int) object);
+ break;
+ case INT64:
+ byteBuffer.putLong(index, (long) object);
+ break;
+ case FLOAT:
+ byteBuffer.putFloat(index, (float) object);
+ break;
+ case DOUBLE:
+ byteBuffer.putDouble(index, (double) object);
+ break;
+ case TEXT:
+ // TODO
+ break;
+ }
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public Object toArray() {
+ int arraySize = (int) (size / NVMSpaceManager.getPrimitiveTypeByteSize(dataType));
+ Object[] array = new Object[arraySize];
+ for (int i = 0; i < arraySize; i++) {
+ array[i] = get(i);
+ }
+ return array;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpace.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpace.java
new file mode 100644
index 0000000..30c41c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpace.java
@@ -0,0 +1,28 @@
+package org.apache.iotdb.db.nvm.space;
+
+import java.nio.ByteBuffer;
+
+public class NVMSpace {
+
+ protected long offset;
+ protected long size;
+ protected ByteBuffer byteBuffer;
+
+ NVMSpace(long offset, long size, ByteBuffer byteBuffer) {
+ this.offset = offset;
+ this.size = size;
+ this.byteBuffer = byteBuffer;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return byteBuffer;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
index 070f62e..5ae2dc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
@@ -1,13 +1,16 @@
package org.apache.iotdb.db.nvm.space;
+import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.nvm.metadata.DataTypeMemo;
+import org.apache.iotdb.db.nvm.metadata.FreeSpaceBitMap;
+import org.apache.iotdb.db.nvm.metadata.TSDataMap;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -18,30 +21,67 @@ public class NVMSpaceManager {
private static final Logger logger = LoggerFactory.getLogger(NVMSpaceManager.class);
+ private static final String NVM_FILE_NAME = "nvmFile";
+ private static final int NVMSPACE_NUM_MAX = 1000000;
+
+ private static final long BITMAP_FIELD_OFFSET = 0L;
+ private static final long BITMAP_FIELD_BYTE_SIZE = Byte.BYTES * NVMSPACE_NUM_MAX;
+
+ private static final long DATATYPE_FIELD_OFFSET = BITMAP_FIELD_OFFSET + BITMAP_FIELD_BYTE_SIZE;
+ private static final long DATATYPE_FIELD_BYTE_SIZE = Short.BYTES * NVMSPACE_NUM_MAX;
+
+ private static final long TSID_FIELD_OFFSET = DATATYPE_FIELD_OFFSET + DATATYPE_FIELD_BYTE_SIZE;
+ private static final long TSID_FIELD_BYTE_SIZE = getPrimitiveTypeByteSize(TSDataType.INT64) * NVMSPACE_NUM_MAX;
+
+ private static final long TVMAP_FIELD_OFFSET = TSID_FIELD_OFFSET + TSID_FIELD_BYTE_SIZE;
+ private static final long TVMAP_FIELD_BYTE_SIZE = getPrimitiveTypeByteSize(TSDataType.INT32) * 2 * NVMSPACE_NUM_MAX;
+
+ private static final long DATA_FILED_OFFSET = TVMAP_FIELD_OFFSET + TVMAP_FIELD_BYTE_SIZE;
+
private final static NVMSpaceManager INSTANCE = new NVMSpaceManager();
- private String NVM_PATH;
+ private String nvmFilePath;
private FileChannel nvmFileChannel;
private final MapMode MAP_MODE = MapMode.READ_WRITE;
private long nvmSize;
- private long curOffset = 0L;
- private NVMSpaceManager() {
-// init();
- }
+ /**
+ * metadata fields
+ */
+ private FreeSpaceBitMap freeSpaceBitMap;
+ private DataTypeMemo dataTypeMemo;
+ private TSDataMap tsDataMap;
+
+ /**
+ * data field
+ */
+ private AtomicInteger curDataSpaceIndex = new AtomicInteger(0);
+ private long curDataOffset = DATA_FILED_OFFSET;
+
+ private NVMSpaceManager() {}
public void init() throws StartupException {
try {
- NVM_PATH = IoTDBDescriptor.getInstance().getConfig().getNvmDir() + "/nvmFile";
- FSFactoryProducer.getFSFactory().getFile(IoTDBDescriptor.getInstance().getConfig().getNvmDir()).mkdirs();
- nvmFileChannel = new RandomAccessFile(NVM_PATH, "rw").getChannel();
- nvmSize = nvmFileChannel.size();
+ String nvmDir = IoTDBDescriptor.getInstance().getConfig().getNvmDir();
+ nvmFilePath = nvmDir + File.pathSeparatorChar + NVM_FILE_NAME;
+ File nvmDirFile = FSFactoryProducer.getFSFactory().getFile(nvmDir);
+ nvmDirFile.mkdirs();
+ nvmSize = nvmDirFile.getUsableSpace();
+ nvmFileChannel = new RandomAccessFile(nvmFilePath, "rw").getChannel();
+
+ initMetadataFields();
} catch (IOException e) {
- logger.error("Fail to open NVM space at {}.", NVM_PATH, e);
+ logger.error("Fail to open NVM space at {}.", nvmFilePath, e);
throw new StartupException(e);
}
}
+ private void initMetadataFields() throws IOException {
+ freeSpaceBitMap = new FreeSpaceBitMap(nvmFileChannel.map(MAP_MODE, BITMAP_FIELD_OFFSET, BITMAP_FIELD_BYTE_SIZE));
+ dataTypeMemo = new DataTypeMemo(nvmFileChannel.map(MAP_MODE, DATATYPE_FIELD_OFFSET, DATATYPE_FIELD_BYTE_SIZE));
+ tsDataMap = new TSDataMap(nvmFileChannel.map(MAP_MODE, TSID_FIELD_OFFSET, TSID_FIELD_BYTE_SIZE));
+ }
+
public void close() throws IOException {
nvmFileChannel.close();
}
@@ -50,19 +90,19 @@ public class NVMSpaceManager {
int size = 0;
switch (dataType) {
case BOOLEAN:
- size = 8;
+ size = Byte.BYTES;
break;
case INT32:
- size = Integer.SIZE;
+ size = Integer.BYTES;
break;
case INT64:
- size = Long.SIZE;
+ size = Long.BYTES;
break;
case FLOAT:
- size = Float.SIZE;
+ size = Float.BYTES;
break;
case DOUBLE:
- size = Double.SIZE;
+ size = Double.BYTES;
break;
case TEXT:
// TODO
@@ -70,113 +110,43 @@ public class NVMSpaceManager {
default:
throw new UnSupportedDataTypeException("DataType: " + dataType);
}
- return size >> 3;
+ return size;
}
- public synchronized NVMSpace allocate(long size, TSDataType dataType) {
+ public synchronized NVMSpace allocateSpace(long offset, long size) throws IOException {
+ return new NVMSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset, size));
+ }
+
+ public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType dataType) {
// TODO check if full
try {
-// logger.debug("Try to allocate {} nvm space at {}.", size, curOffset);
- NVMSpace nvmSpace = new NVMSpace(curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), dataType);
- curOffset += size;
+ logger.trace("Try to allocate {} nvm space at {}.", size, curDataOffset);
+ NVMDataSpace nvmSpace = new NVMDataSpace(
+ curDataOffset, size, nvmFileChannel.map(MAP_MODE, curDataOffset, size), curDataSpaceIndex
+ .getAndIncrement(), dataType);
+ curDataOffset += size;
return nvmSpace;
} catch (IOException e) {
// TODO deal with error
- logger.error("Fail to allocate {} nvm space at {}.", size, curOffset);
+ logger.error("Fail to allocate {} nvm space at {}.", size, curDataOffset);
e.printStackTrace();
return null;
}
}
- public class NVMSpace {
-
- private long offset;
- private long size;
- private ByteBuffer byteBuffer;
- private TSDataType dataType;
-
- private NVMSpace(long offset, long size, ByteBuffer byteBuffer, TSDataType dataType) {
- this.offset = offset;
- this.size = size;
- this.byteBuffer = byteBuffer;
- this.dataType = dataType;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public long getSize() {
- return size;
- }
-
- public ByteBuffer getByteBuffer() {
- return byteBuffer;
- }
-
- @Override
- public NVMSpace clone() {
- NVMSpace cloneSpace = NVMSpaceManager.getInstance().allocate(size, dataType);
- int position = byteBuffer.position();
- byteBuffer.rewind();
- cloneSpace.getByteBuffer().put(byteBuffer);
- byteBuffer.position(position);
- cloneSpace.getByteBuffer().flip();
- return cloneSpace;
- }
+ public void registerNVMDataSpace(NVMDataSpace nvmDataSpace) {
+ int spaceIndex = nvmDataSpace.getIndex();
+ freeSpaceBitMap.update(spaceIndex, false);
+ dataTypeMemo.set(spaceIndex, nvmDataSpace.getDataType());
+ }
- public Object get(int index) {
- int objectSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
- index *= objectSize;
- Object object = null;
- switch (dataType) {
- case BOOLEAN:
- object = byteBuffer.get(index);
- break;
- case INT32:
- object = byteBuffer.getInt(index);
- break;
- case INT64:
- object = byteBuffer.getLong(index);
- break;
- case FLOAT:
- object = byteBuffer.getFloat(index);
- break;
- case DOUBLE:
- object = byteBuffer.getDouble(index);
- break;
- case TEXT:
- // TODO
- break;
- }
- return object;
- }
+ public void unregisterNVMDataSpace(NVMDataSpace nvmDataSpace) {
+ freeSpaceBitMap.update(nvmDataSpace.getIndex(), true);
+ }
- public void set(int index, Object object) {
- int objectSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
- index *= objectSize;
- switch (dataType) {
- case BOOLEAN:
- byteBuffer.put(index, (byte) object);
- break;
- case INT32:
- byteBuffer.putInt(index, (int) object);
- break;
- case INT64:
- byteBuffer.putLong(index, (long) object);
- break;
- case FLOAT:
- byteBuffer.putFloat(index, (float) object);
- break;
- case DOUBLE:
- byteBuffer.putDouble(index, (double) object);
- break;
- case TEXT:
- // TODO
- break;
- }
- }
+ public void addSpaceToTimeSeries(String sgId, String deviceId, String measurementId, int timeSpaceIndex, int valueSpaceIndex) {
+ tsDataMap.addSpaceToTimeSeries(sgId, deviceId, measurementId, timeSpaceIndex, valueSpaceIndex);
}
public static NVMSpaceManager getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
index 83211ff..9d3a53c 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
@@ -46,14 +46,14 @@ public class MemTablePool {
}
// TODO change the impl of getAvailableMemTable to non-blocking
- public IMemTable getAvailableMemTable(Object applier, boolean nvm) {
+ public IMemTable getAvailableMemTable(Object applier, boolean nvm, String sgId) {
if (nvm) {
synchronized (availableNVMMemTables) {
if (availableNVMMemTables.isEmpty() && nvmSize < CONFIG.getMaxMemtableNumber()) {
nvmSize++;
logger.info("generated a new nvm memtable for {}, system memtable size: {}, stack size: {}",
applier, nvmSize, availableNVMMemTables.size());
- return new NVMPrimitiveMemTable();
+ return new NVMPrimitiveMemTable(sgId);
} else if (!availableNVMMemTables.isEmpty()) {
logger
.debug(
@@ -86,7 +86,7 @@ public class MemTablePool {
size++;
logger.info("generated a new memtable for {}, system memtable size: {}, stack size: {}",
applier, size, availableMemTables.size());
- return new PrimitiveMemTable();
+ return new PrimitiveMemTable(sgId);
} else if (!availableMemTables.isEmpty()) {
logger
.debug(
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
index f7f8d91..6be1d7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
@@ -25,13 +25,13 @@ import java.util.Map;
import java.util.Queue;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.nvm.datastructure.AbstractTVList;
-import org.apache.iotdb.db.nvm.datastructure.NVMBooleanTVList;
-import org.apache.iotdb.db.nvm.datastructure.NVMDoubleTVList;
-import org.apache.iotdb.db.nvm.datastructure.NVMFloatTVList;
-import org.apache.iotdb.db.nvm.datastructure.NVMIntTVList;
-import org.apache.iotdb.db.nvm.datastructure.NVMLongTVList;
-import org.apache.iotdb.db.nvm.datastructure.NVMTVList;
+import org.apache.iotdb.db.utils.datastructure.AbstractTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMBooleanTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMDoubleTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMFloatTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMIntTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMLongTVList;
+import org.apache.iotdb.db.utils.datastructure.NVMTVList;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.ServiceType;
@@ -59,12 +59,16 @@ public class TVListAllocator implements TVListAllocatorMBean, IService {
}
public synchronized AbstractTVList allocate(TSDataType dataType, boolean nvm) {
+ return allocate(null, null, null, dataType, nvm);
+ }
+
+ public synchronized AbstractTVList allocate(String sgId, String deviceId, String measurementId, TSDataType dataType, boolean nvm) {
AbstractTVList list = null;
if (nvm) {
Queue<NVMTVList> tvLists = nvmTVListCache.computeIfAbsent(dataType,
k -> new ArrayDeque<>());
list = tvLists.poll();
- return list != null ? list : NVMTVList.newList(dataType);
+ return list != null ? list : NVMTVList.newList(sgId, deviceId, measurementId, dataType);
} else {
Queue<TVList> tvLists = tvListCache.computeIfAbsent(dataType,
k -> new ArrayDeque<>());
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/AbstractTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/nvm/datastructure/AbstractTVList.java
rename to server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
index b3a6e84..14c5b64 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/AbstractTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
@@ -1,6 +1,5 @@
-package org.apache.iotdb.db.nvm.datastructure;
+package org.apache.iotdb.db.utils.datastructure;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
public abstract class AbstractTVList {
@@ -115,7 +114,7 @@ public abstract class AbstractTVList {
protected abstract void reverseRange(int lo, int hi);
- protected abstract void expandValues();
+ protected abstract Object expandValues();
@Override
public abstract AbstractTVList clone();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index c56c6d4..be799bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -157,9 +157,11 @@ public class BinaryTVList extends TVList {
}
@Override
- protected void expandValues() {
- values.add((Binary[]) PrimitiveArrayPool
- .getInstance().getPrimitiveDataListByType(TSDataType.TEXT));
+ protected Object expandValues() {
+ Binary[] binaries = (Binary[]) PrimitiveArrayPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.TEXT);
+ values.add(binaries);
+ return binaries;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 634b3fa..9b00d7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -156,9 +156,11 @@ public class BooleanTVList extends TVList {
}
@Override
- protected void expandValues() {
- values.add((boolean[]) PrimitiveArrayPool
- .getInstance().getPrimitiveDataListByType(TSDataType.BOOLEAN));
+ protected Object expandValues() {
+ boolean[] booleans = (boolean[]) PrimitiveArrayPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.BOOLEAN);
+ values.add(booleans);
+ return booleans;
}
@Override
@@ -207,4 +209,8 @@ public class BooleanTVList extends TVList {
}
}
}
+
+ void addBatchValue(boolean[] batch) {
+ values.add(batch);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 33b76c6..1ef56c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -156,9 +156,11 @@ public class DoubleTVList extends TVList {
}
@Override
- protected void expandValues() {
- values.add((double[]) PrimitiveArrayPool
- .getInstance().getPrimitiveDataListByType(TSDataType.DOUBLE));
+ protected Object expandValues() {
+ double[] doubles = (double[]) PrimitiveArrayPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.DOUBLE);
+ values.add(doubles);
+ return doubles;
}
@Override
@@ -207,4 +209,8 @@ public class DoubleTVList extends TVList {
}
}
}
+
+ void addBatchValue(double[] batch) {
+ values.add(batch);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index b0ed801..867c1b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -156,9 +156,11 @@ public class FloatTVList extends TVList {
}
@Override
- protected void expandValues() {
- values.add((float[]) PrimitiveArrayPool
- .getInstance().getPrimitiveDataListByType(TSDataType.FLOAT));
+ protected Object expandValues() {
+ float[] floats = (float[]) PrimitiveArrayPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.FLOAT);
+ values.add(floats);
+ return floats;
}
@Override
@@ -207,4 +209,8 @@ public class FloatTVList extends TVList {
}
}
}
+
+ void addBatchValue(float[] batch) {
+ values.add(batch);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index e82ebea..79a9ccc 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -156,9 +156,11 @@ public class IntTVList extends TVList {
}
@Override
- protected void expandValues() {
- values.add((int[]) PrimitiveArrayPool
- .getInstance().getPrimitiveDataListByType(TSDataType.INT32));
+ protected Object expandValues() {
+ int[] ints = (int[]) PrimitiveArrayPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.INT32);
+ values.add(ints);
+ return ints;
}
@Override
@@ -207,4 +209,8 @@ public class IntTVList extends TVList {
}
}
}
+
+ void addBatchValue(int[] batch) {
+ values.add(batch);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 604c973..5c25416 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -156,9 +156,11 @@ public class LongTVList extends TVList {
}
@Override
- protected void expandValues() {
- values.add((long[]) PrimitiveArrayPool
- .getInstance().getPrimitiveDataListByType(TSDataType.INT64));
+ protected Object expandValues() {
+ long[] longs = (long[]) PrimitiveArrayPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.INT64);
+ values.add(longs);
+ return longs;
}
@Override
@@ -207,4 +209,8 @@ public class LongTVList extends TVList {
}
}
}
+
+ void addBatchValue(long[] batch) {
+ values.add(batch);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMBooleanTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBooleanTVList.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMBooleanTVList.java
rename to server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBooleanTVList.java
index 754959c..508f08c 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMBooleanTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBooleanTVList.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.nvm.datastructure;
+package org.apache.iotdb.db.utils.datastructure;
import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -13,8 +13,8 @@ public class NVMBooleanTVList extends NVMTVList {
private boolean pivotValue;
- NVMBooleanTVList() {
- super();
+ NVMBooleanTVList(String sgId, String deviceId, String measurementId) {
+ super(sgId, deviceId, measurementId);
dataType = TSDataType.BOOLEAN;
}
@@ -25,7 +25,7 @@ public class NVMBooleanTVList extends NVMTVList {
int elementIndex = size % ARRAY_SIZE;
minTime = minTime <= timestamp ? minTime : timestamp;
timestamps.get(arrayIndex).set(elementIndex, timestamp);
- values.get(arrayIndex).set(elementIndex, value);
+ values.get(arrayIndex).set(elementIndex, value ? (byte) 1 : (byte) 0);
size++;
if (sorted && size > 1 && timestamp < getTime(size - 2)) {
sorted = false;
@@ -39,15 +39,15 @@ public class NVMBooleanTVList extends NVMTVList {
}
int arrayIndex = index / ARRAY_SIZE;
int elementIndex = index % ARRAY_SIZE;
- return (boolean) values.get(arrayIndex).get(elementIndex);
+ return ((byte) values.get(arrayIndex).get(elementIndex)) == 1;
}
@Override
- public NVMBooleanTVList clone() {
- NVMBooleanTVList cloneList = new NVMBooleanTVList();
+ public BooleanTVList clone() {
+ BooleanTVList cloneList = new BooleanTVList();
cloneAs(cloneList);
- for (NVMSpace valueSpace : values) {
- cloneList.values.add(cloneValue(valueSpace));
+ for (NVMDataSpace valueSpace : values) {
+ cloneList.addBatchValue((boolean[]) cloneValue(valueSpace));
}
return cloneList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMDoubleTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMDoubleTVList.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMDoubleTVList.java
rename to server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMDoubleTVList.java
index 73f3834..62cc640 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMDoubleTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMDoubleTVList.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.nvm.datastructure;
+package org.apache.iotdb.db.utils.datastructure;
import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -13,8 +13,8 @@ public class NVMDoubleTVList extends NVMTVList {
private double pivotValue;
- NVMDoubleTVList() {
- super();
+ NVMDoubleTVList(String sgId, String deviceId, String measurementId) {
+ super(sgId, deviceId, measurementId);
dataType = TSDataType.DOUBLE;
}
@@ -43,11 +43,11 @@ public class NVMDoubleTVList extends NVMTVList {
}
@Override
- public NVMDoubleTVList clone() {
- NVMDoubleTVList cloneList = new NVMDoubleTVList();
+ public DoubleTVList clone() {
+ DoubleTVList cloneList = new DoubleTVList();
cloneAs(cloneList);
- for (NVMSpace valueSpace : values) {
- cloneList.values.add(cloneValue(valueSpace));
+ for (NVMDataSpace valueSpace : values) {
+ cloneList.addBatchValue((double[]) cloneValue(valueSpace));
}
return cloneList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMFloatTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMFloatTVList.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMFloatTVList.java
rename to server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMFloatTVList.java
index f70efd4..202785f 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMFloatTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMFloatTVList.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.nvm.datastructure;
+package org.apache.iotdb.db.utils.datastructure;
import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -13,8 +13,8 @@ public class NVMFloatTVList extends NVMTVList {
private float pivotValue;
- NVMFloatTVList() {
- super();
+ NVMFloatTVList(String sgId, String deviceId, String measurementId) {
+ super(sgId, deviceId, measurementId);
dataType = TSDataType.FLOAT;
}
@@ -43,11 +43,11 @@ public class NVMFloatTVList extends NVMTVList {
}
@Override
- public NVMFloatTVList clone() {
- NVMFloatTVList cloneList = new NVMFloatTVList();
+ public FloatTVList clone() {
+ FloatTVList cloneList = new FloatTVList();
cloneAs(cloneList);
- for (NVMSpace valueSpace : values) {
- cloneList.values.add(cloneValue(valueSpace));
+ for (NVMDataSpace valueSpace : values) {
+ cloneList.addBatchValue((float[]) cloneValue(valueSpace));
}
return cloneList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMIntTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMIntTVList.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMIntTVList.java
rename to server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMIntTVList.java
index 61218f0..a194206 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMIntTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMIntTVList.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.nvm.datastructure;
+package org.apache.iotdb.db.utils.datastructure;
import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -13,8 +13,8 @@ public class NVMIntTVList extends NVMTVList {
private int pivotValue;
- public NVMIntTVList() {
- super();
+ public NVMIntTVList(String sgId, String deviceId, String measurementId) {
+ super(sgId, deviceId, measurementId);
dataType = TSDataType.INT32;
}
@@ -43,11 +43,11 @@ public class NVMIntTVList extends NVMTVList {
}
@Override
- public NVMIntTVList clone() {
- NVMIntTVList cloneList = new NVMIntTVList();
+ public IntTVList clone() {
+ IntTVList cloneList = new IntTVList();
cloneAs(cloneList);
- for (NVMSpace valueSpace : values) {
- cloneList.values.add(cloneValue(valueSpace));
+ for (NVMDataSpace valueSpace : values) {
+ cloneList.addBatchValue((int[]) cloneValue(valueSpace));
}
return cloneList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMLongTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMLongTVList.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMLongTVList.java
rename to server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMLongTVList.java
index 4e42bb2..484b309 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMLongTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMLongTVList.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.nvm.datastructure;
+package org.apache.iotdb.db.utils.datastructure;
import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -13,8 +13,8 @@ public class NVMLongTVList extends NVMTVList {
private long pivotValue;
- NVMLongTVList() {
- super();
+ NVMLongTVList(String sgId, String deviceId, String measurementId) {
+ super(sgId, deviceId, measurementId);
dataType = TSDataType.INT64;
}
@@ -43,11 +43,11 @@ public class NVMLongTVList extends NVMTVList {
}
@Override
- public NVMLongTVList clone() {
- NVMLongTVList cloneList = new NVMLongTVList();
+ public LongTVList clone() {
+ LongTVList cloneList = new LongTVList();
cloneAs(cloneList);
- for (NVMSpace valueSpace : values) {
- cloneList.values.add(cloneValue(valueSpace));
+ for (NVMDataSpace valueSpace : values) {
+ cloneList.addBatchValue((long[]) cloneValue(valueSpace));
}
return cloneList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
similarity index 63%
rename from server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMTVList.java
rename to server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
index 6c9f2a4..9543f6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/datastructure/NVMTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
@@ -1,21 +1,28 @@
-package org.apache.iotdb.db.nvm.datastructure;
+package org.apache.iotdb.db.utils.datastructure;
import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSpace;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
+import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public abstract class NVMTVList extends AbstractTVList {
- protected List<NVMSpace> timestamps;
- protected List<NVMSpace> values;
+ protected String sgId;
+ protected String deviceId;
+ protected String measurementId;
+ protected List<NVMDataSpace> timestamps;
+ protected List<NVMDataSpace> values;
protected TSDataType dataType;
- public NVMTVList() {
+ public NVMTVList(String sgId, String deviceId, String measurementId) {
+ this.sgId = sgId;
+ this.deviceId = deviceId;
+ this.measurementId = measurementId;
timestamps = new ArrayList<>();
values = new ArrayList<>();
size = 0;
@@ -42,14 +49,14 @@ public abstract class NVMTVList extends AbstractTVList {
values.get(arrayIndex).set(elementIndex, value);
}
- protected NVMSpace cloneValue(NVMSpace valueSpace) {
- return valueSpace.clone();
+ protected Object cloneValue(NVMDataSpace valueSpace) {
+ return valueSpace.toArray();
}
@Override
protected void clearValue() {
if (values != null) {
- for (NVMSpace valueSpace : values) {
+ for (NVMDataSpace valueSpace : values) {
NVMPrimitiveArrayPool.getInstance().release(valueSpace, dataType);
}
values.clear();
@@ -57,9 +64,11 @@ public abstract class NVMTVList extends AbstractTVList {
}
@Override
- protected void expandValues() {
- values.add(NVMPrimitiveArrayPool
- .getInstance().getPrimitiveDataListByType(dataType));
+ protected NVMDataSpace expandValues() {
+ NVMDataSpace dataSpace = NVMPrimitiveArrayPool
+ .getInstance().getPrimitiveDataListByType(dataType);
+ values.add(dataSpace);
+ return dataSpace;
}
@Override
@@ -97,9 +106,9 @@ public abstract class NVMTVList extends AbstractTVList {
@Override
protected void cloneAs(AbstractTVList abstractCloneList) {
- NVMTVList cloneList = (NVMTVList) abstractCloneList;
- for (NVMSpace timeSpace : timestamps) {
- cloneList.timestamps.add((NVMSpace) cloneTime(timeSpace));
+ TVList cloneList = (TVList) abstractCloneList;
+ for (NVMDataSpace timeSpace : timestamps) {
+ cloneList.timestamps.add((long[]) cloneTime(timeSpace));
}
cloneList.size = size;
cloneList.sorted = sorted;
@@ -109,7 +118,7 @@ public abstract class NVMTVList extends AbstractTVList {
@Override
protected void clearTime() {
if (timestamps != null) {
- for (NVMSpace timeSpace : timestamps) {
+ for (NVMDataSpace timeSpace : timestamps) {
NVMPrimitiveArrayPool.getInstance().release(timeSpace, TSDataType.INT64);
}
timestamps.clear();
@@ -129,32 +138,34 @@ public abstract class NVMTVList extends AbstractTVList {
@Override
protected void checkExpansion() {
if ((size % ARRAY_SIZE) == 0) {
- expandValues();
- timestamps.add(NVMPrimitiveArrayPool.getInstance().getPrimitiveDataListByType(TSDataType.INT64));
+ NVMDataSpace valueSpace = expandValues();
+ NVMDataSpace timeSpace = NVMPrimitiveArrayPool.getInstance().getPrimitiveDataListByType(TSDataType.INT64);
+ timestamps.add(timeSpace);
+ NVMSpaceManager.getInstance().addSpaceToTimeSeries(sgId, deviceId, measurementId, timestamps.get(timestamps.size() - 1).getIndex(), values.get(values.size() - 1).getIndex());
}
}
@Override
protected Object cloneTime(Object object) {
- NVMSpace timeSpace = (NVMSpace) object;
- return timeSpace.clone();
+ NVMDataSpace timeSpace = (NVMDataSpace) object;
+ return timeSpace.toArray();
}
- public static NVMTVList newList(TSDataType dataType) {
+ public static NVMTVList newList(String sgId, String deviceId, String measurementId, TSDataType dataType) {
switch (dataType) {
case TEXT:
// TODO
// return new BinaryTVList();
case FLOAT:
- return new NVMFloatTVList();
+ return new NVMFloatTVList(sgId, deviceId, measurementId);
case INT32:
- return new NVMIntTVList();
+ return new NVMIntTVList(sgId, deviceId, measurementId);
case INT64:
- return new NVMLongTVList();
+ return new NVMLongTVList(sgId, deviceId, measurementId);
case DOUBLE:
- return new NVMDoubleTVList();
+ return new NVMDoubleTVList(sgId, deviceId, measurementId);
case BOOLEAN:
- return new NVMBooleanTVList();
+ return new NVMBooleanTVList(sgId, deviceId, measurementId);
}
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d32e7e9..f241562 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -23,7 +23,6 @@ import static org.apache.iotdb.db.rescon.PrimitiveArrayPool.ARRAY_SIZE;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.nvm.datastructure.AbstractTVList;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 148cf73..6f52c2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -61,16 +61,18 @@ public class TsFileRecoverPerformer {
private LogReplayer logReplayer;
private TsFileResource tsFileResource;
private boolean acceptUnseq;
+ private String storageGroupId;
public TsFileRecoverPerformer(String logNodePrefix,
Schema schema, VersionController versionController,
- TsFileResource currentTsFileResource, boolean acceptUnseq) {
+ TsFileResource currentTsFileResource, boolean acceptUnseq, String sgId) {
this.insertFilePath = currentTsFileResource.getFile().getPath();
this.logNodePrefix = logNodePrefix;
this.schema = schema;
this.versionController = versionController;
this.tsFileResource = currentTsFileResource;
this.acceptUnseq = acceptUnseq;
+ this.storageGroupId = sgId;
}
/**
@@ -79,7 +81,7 @@ public class TsFileRecoverPerformer {
*/
public void recover() throws StorageGroupProcessorException {
- IMemTable recoverMemTable = new PrimitiveMemTable();
+ IMemTable recoverMemTable = new PrimitiveMemTable(storageGroupId);
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
versionController,
tsFileResource, schema, recoverMemTable, acceptUnseq);
@@ -197,7 +199,7 @@ public class TsFileRecoverPerformer {
private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
throws StorageGroupProcessorException {
- IMemTable recoverMemTable = new PrimitiveMemTable();
+ IMemTable recoverMemTable = new PrimitiveMemTable(storageGroupId);
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
versionController,
tsFileResource, schema, recoverMemTable, acceptUnseq);
@@ -218,4 +220,7 @@ public class TsFileRecoverPerformer {
}
}
+ private void recoverNVMData() {
+
+ }
}