You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2020/01/08 13:52:42 UTC
[incubator-iotdb] 02/03: complete recovery
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch nvmlogging
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 57e2bc7accf6e133332f1da4dc384753e0918269
Author: mdf369 <95...@qq.com>
AuthorDate: Wed Jan 8 11:46:10 2020 +0800
complete recovery
---
.../db/nvm/memtable/NVMPrimitiveMemTable.java | 6 +-
.../apache/iotdb/db/nvm/metadata/DataTypeMemo.java | 11 +--
.../iotdb/db/nvm/metadata/FreeSpaceBitMap.java | 27 -------
.../apache/iotdb/db/nvm/metadata/OffsetMemo.java | 18 +++++
.../iotdb/db/nvm/metadata/SpaceStatusBitMap.java | 34 ++++++++
.../db/nvm/metadata/TimeseriesTimeIndexMapper.java | 31 +++-----
.../nvm/recover/NVMMemtableRecoverPerformer.java | 49 +++++-------
.../apache/iotdb/db/nvm/space/NVMDataSpace.java | 39 ++++++++-
.../apache/iotdb/db/nvm/space/NVMSpaceManager.java | 92 +++++++++-------------
.../db/nvm/space/NVMSpaceMetadataManager.java | 67 ++++++++--------
.../apache/iotdb/db/nvm/space/NVMStringBuffer.java | 58 ++++++++++++++
.../apache/iotdb/db/nvm/space/NVMStringSpace.java | 25 ------
.../java/org/apache/iotdb/db/service/IoTDB.java | 12 ++-
.../db/engine/memtable/MemTableFlushTaskTest.java | 2 +-
.../iotdb/db/engine/memtable/MemTablePoolTest.java | 2 +-
.../db/engine/memtable/MemtableBenchmark.java | 3 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 6 +-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 2 +-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 2 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 2 +-
20 files changed, 276 insertions(+), 212 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
index 45118d2..c58f7bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
@@ -8,10 +8,12 @@ import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
+import org.apache.iotdb.db.engine.memtable.WritableMemChunk;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.utils.datastructure.NVMTVList;
import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -60,8 +62,8 @@ public class NVMPrimitiveMemTable extends AbstractMemTable {
} else {
long undeletedTime = findUndeletedTime(deviceId, measurement, timeLowerBound);
IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
- IWritableMemChunk chunkCopy = new NVMWritableMemChunk(dataType,
- (NVMTVList) memChunk.getTVList().clone());
+ IWritableMemChunk chunkCopy = new WritableMemChunk(dataType,
+ (TVList) memChunk.getTVList().clone());
chunkCopy.setTimeOffset(undeletedTime);
sorter = chunkCopy;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
index 3b3e484..5fd7761 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
@@ -1,7 +1,5 @@
package org.apache.iotdb.db.nvm.metadata;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.nvm.space.NVMSpace;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -15,12 +13,7 @@ public class DataTypeMemo extends NVMSpaceMetadata {
space.getByteBuffer().putShort(index, dataType.serialize());
}
- public List<TSDataType> getDataTypeList(int num) {
- List<TSDataType> dataTypeList = new ArrayList<>(num);
- for (int i = 0; i < num; i++) {
- TSDataType dataType = TSDataType.deserialize(space.getByteBuffer().getShort(i));
- dataTypeList.add(dataType);
- }
- return dataTypeList;
+ public TSDataType get(int index) {
+ return TSDataType.deserialize(space.getByteBuffer().getShort(index));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
deleted file mode 100644
index 9ae5cb9..0000000
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.iotdb.db.nvm.metadata;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.db.nvm.space.NVMSpace;
-
-public class FreeSpaceBitMap extends NVMSpaceMetadata {
-
- public FreeSpaceBitMap(NVMSpace space) {
- super(space);
- }
-
- public void update(int index, boolean setFree) {
- space.getByteBuffer().put(index, setFree ? (byte) 0 : (byte) 1);
- }
-
- public List<Integer> getValidSpaceIndexList() {
- List<Integer> freeSpaceIndexList = new ArrayList<>();
- for (int i = 0; i < space.getSize(); i++) {
- byte flag = space.getByteBuffer().get(i);
- if (flag == 1) {
- freeSpaceIndexList.add(i);
- }
- }
- return freeSpaceIndexList;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/OffsetMemo.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/OffsetMemo.java
new file mode 100644
index 0000000..ecb2554
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/OffsetMemo.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import org.apache.iotdb.db.nvm.space.NVMSpace;
+
+public class OffsetMemo extends NVMSpaceMetadata {
+
+ public OffsetMemo(NVMSpace space) {
+ super(space);
+ }
+
+ public void set(int index, long offset) {
+ space.getByteBuffer().putLong(index, offset);
+ }
+
+ public long get(int index) {
+ return space.getByteBuffer().getLong(index);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceStatusBitMap.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceStatusBitMap.java
new file mode 100644
index 0000000..783df57
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceStatusBitMap.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.nvm.space.NVMSpace;
+
+public class SpaceStatusBitMap extends NVMSpaceMetadata {
+
+ public SpaceStatusBitMap(NVMSpace space) {
+ super(space);
+ }
+
+ public void setUse(int index, boolean isTime) {
+ space.getByteBuffer().put(index, isTime ? (byte) 1 : (byte) 2);
+ }
+
+ public void setFree(int index) {
+ space.getByteBuffer().put(index, (byte) 0);
+ }
+
+ public List<Integer> getValidTimeSpaceIndexList(int count) {
+ List<Integer> validTimeSpaceIndexList = new ArrayList<>();
+ for (int i = 0; i < space.getSize(); i++) {
+ byte flag = space.getByteBuffer().get(i);
+ if (flag == 1) {
+ validTimeSpaceIndexList.add(i);
+ if (validTimeSpaceIndexList.size() == count) {
+ break;
+ }
+ }
+ }
+ return validTimeSpaceIndexList;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeseriesTimeIndexMapper.java b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeseriesTimeIndexMapper.java
index 0a7084d..15ba4b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeseriesTimeIndexMapper.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeseriesTimeIndexMapper.java
@@ -2,21 +2,17 @@ package org.apache.iotdb.db.nvm.metadata;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
import org.apache.iotdb.db.nvm.space.NVMSpace;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
-import org.apache.iotdb.db.nvm.space.NVMStringSpace;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.db.nvm.space.NVMStringBuffer;
public class TimeseriesTimeIndexMapper extends NVMSpaceMetadata {
// TODO
private final long STRING_SPACE_SIZE_MAX = 1000;
- private NVMStringSpace sgSapce;
- private NVMStringSpace deviceSapce;
- private NVMStringSpace measurementSapce;
+ private NVMStringBuffer sgIdBuffer;
+ private NVMStringBuffer deviceIdBuffer;
+ private NVMStringBuffer measurementIdBuffer;
public TimeseriesTimeIndexMapper(NVMSpace space) throws IOException {
super(space);
@@ -25,17 +21,16 @@ public class TimeseriesTimeIndexMapper extends NVMSpaceMetadata {
}
private void initTimeseriesSpaces() throws IOException {
- NVMSpaceManager spaceManager = NVMSpaceManager.getInstance();
- sgSapce = spaceManager.allocateStringSpace(STRING_SPACE_SIZE_MAX);
- deviceSapce = spaceManager.allocateStringSpace(STRING_SPACE_SIZE_MAX);
- measurementSapce = spaceManager.allocateStringSpace(STRING_SPACE_SIZE_MAX);
+ sgIdBuffer = new NVMStringBuffer(STRING_SPACE_SIZE_MAX);
+ deviceIdBuffer = new NVMStringBuffer(STRING_SPACE_SIZE_MAX);
+ measurementIdBuffer = new NVMStringBuffer(STRING_SPACE_SIZE_MAX);
}
public void mapTimeIndexToTimeSeries(int timeSpaceIndex, String sgId,
String deviceId, String measurementId) {
- int sgIndex = sgSapce.put(sgId);
- int deviceIndex = deviceSapce.put(sgId);
- int measurementIndex = measurementSapce.put(sgId);
+ int sgIndex = sgIdBuffer.put(sgId);
+ int deviceIndex = deviceIdBuffer.put(deviceId);
+ int measurementIndex = measurementIdBuffer.put(measurementId);
mapTimeIndexToTimeSeries(timeSpaceIndex, sgIndex, deviceIndex, measurementIndex);
}
@@ -52,9 +47,9 @@ public class TimeseriesTimeIndexMapper extends NVMSpaceMetadata {
ByteBuffer byteBuffer = space.getByteBuffer();
int index = timeSpaceIndex * 3;
String[] timeseries = new String[3];
- timeseries[0] = sgSapce.get(byteBuffer.getInt(index));
- timeseries[1] = deviceSapce.get(byteBuffer.getInt(index + 1));
- timeseries[2] = measurementSapce.get(byteBuffer.getInt(index + 2));
+ timeseries[0] = sgIdBuffer.get(byteBuffer.getInt(index));
+ timeseries[1] = deviceIdBuffer.get(byteBuffer.getInt(index + 1));
+ timeseries[2] = measurementIdBuffer.get(byteBuffer.getInt(index + 2));
return timeseries;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
index 7e43b18..f439314 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
@@ -19,6 +19,8 @@ public class NVMMemtableRecoverPerformer {
private final static NVMMemtableRecoverPerformer INSTANCE = new NVMMemtableRecoverPerformer();
+ private NVMSpaceManager spaceManager = NVMSpaceManager.getInstance();
+ private NVMSpaceMetadataManager metadataManager = NVMSpaceMetadataManager.getInstance();
private Map<String, Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>>> dataMap;
private NVMMemtableRecoverPerformer() {}
@@ -31,6 +33,10 @@ public class NVMMemtableRecoverPerformer {
}
}
+ public void close() {
+ dataMap.clear();
+ }
+
public static NVMMemtableRecoverPerformer getInstance() {
return INSTANCE;
}
@@ -38,42 +44,25 @@ public class NVMMemtableRecoverPerformer {
private Map<String, Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>>> recoverDataInNVM()
throws IOException {
Map<String, Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>>> dataMap = new HashMap<>();
- Map<String, Map<String, Map<String, Pair<List<Integer>, List<Integer>>>>> indexMap = NVMSpaceMetadataManager.getInstance().getTimeseriesTVIndexMap();
- List<NVMDataSpace> dataList = NVMSpaceManager.getInstance().getAllNVMData();
-
- for (Entry<String, Map<String, Map<String, Pair<List<Integer>, List<Integer>>>>> sgIndexEntry : indexMap
- .entrySet()) {
- Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>> deviceDataMap = new HashMap<>(sgIndexEntry.getValue().size());
- dataMap.put(sgIndexEntry.getKey(), deviceDataMap);
-
- for (Entry<String, Map<String, Pair<List<Integer>, List<Integer>>>> deviceIndexEntry : sgIndexEntry
- .getValue().entrySet()) {
- Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>> measurementDataMap = new HashMap<>(deviceIndexEntry.getValue().size());
- deviceDataMap.put(deviceIndexEntry.getKey(), measurementDataMap);
-
- for (Entry<String, Pair<List<Integer>, List<Integer>>> measurementIndexEntry : deviceIndexEntry
- .getValue().entrySet()) {
- List<NVMDataSpace> timeList = convertIndexListToDataList(measurementIndexEntry.getValue().left, dataList);
- List<NVMDataSpace> valueList = convertIndexListToDataList(measurementIndexEntry.getValue().right, dataList);
- Pair<List<NVMDataSpace>, List<NVMDataSpace>> tvDataListPair = new Pair<>(timeList, valueList);
- measurementDataMap.put(measurementIndexEntry.getKey(), tvDataListPair);
- }
- }
+ List<Integer> validTimeSpaceIndexList = metadataManager.getValidTimeSpaceIndexList();
+ for (Integer timeSpaceIndex : validTimeSpaceIndexList) {
+ int valueSpaceIndex = metadataManager.getValueSpaceIndexByTimeSpaceIndex(timeSpaceIndex);
+ NVMDataSpace timeSpace = spaceManager.getNVMDataSpaceByIndex(timeSpaceIndex);
+ NVMDataSpace valueSpace = spaceManager.getNVMDataSpaceByIndex(valueSpaceIndex);
+
+ String[] timeseries = metadataManager.getTimeseriesBySpaceIndex(timeSpaceIndex);
+ Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>> deviceTVMap = dataMap.computeIfAbsent(timeseries[0], k -> new HashMap<>());
+ Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>> measurementTVMap = deviceTVMap.computeIfAbsent(timeseries[1], k -> new HashMap<>());
+ Pair<List<NVMDataSpace>, List<NVMDataSpace>> tvPairList = measurementTVMap.computeIfAbsent(timeseries[2], k -> new Pair<>(new ArrayList<>(), new ArrayList<>()));
+ tvPairList.left.add(timeSpace);
+ tvPairList.right.add(valueSpace);
}
return dataMap;
}
- private List<NVMDataSpace> convertIndexListToDataList(List<Integer> indexList, List<NVMDataSpace> totalDataList) {
- List<NVMDataSpace> dataList = new ArrayList<>(indexList.size());
- for (Integer index : indexList) {
- dataList.add(totalDataList.get(index));
- }
- return dataList;
- }
-
public void reconstructMemtable(NVMPrimitiveMemTable memTable, TsFileResource tsFileResource) {
String sgId = memTable.getStorageGroupId();
- Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>> dataOfSG = dataMap.get(sgId);
+ Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>> dataOfSG = dataMap.remove(sgId);
memTable.loadData(dataOfSG);
Map<String, Long>[] maps = getMinMaxTimeMapFromData(dataOfSG);
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
index ff203d2..b9e3d76 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
@@ -106,10 +106,41 @@ public class NVMDataSpace extends NVMSpace {
public Object toArray() {
int arraySize = (int) (size / NVMSpaceManager.getPrimitiveTypeByteSize(dataType));
- Object[] array = new Object[arraySize];
- for (int i = 0; i < arraySize; i++) {
- array[i] = get(i);
+ switch (dataType) {
+ case BOOLEAN:
+ boolean[] boolArray = new boolean[arraySize];
+ for (int i = 0; i < arraySize; i++) {
+ boolArray[i] = ((byte) get(i) == 1);
+ }
+ return boolArray;
+ case INT32:
+ int[] intArray = new int[arraySize];
+ for (int i = 0; i < arraySize; i++) {
+ intArray[i] = (int) get(i);
+ }
+ return intArray;
+ case INT64:
+ long[] longArray = new long[arraySize];
+ for (int i = 0; i < arraySize; i++) {
+ longArray[i] = (long) get(i);
+ }
+ return longArray;
+ case FLOAT:
+ float[] floatArray = new float[arraySize];
+ for (int i = 0; i < arraySize; i++) {
+ floatArray[i] = (float) get(i);
+ }
+ return floatArray;
+ case DOUBLE:
+ double[] doubleArray = new double[arraySize];
+ for (int i = 0; i < arraySize; i++) {
+ doubleArray[i] = (double) get(i);
+ }
+ return doubleArray;
+ case TEXT:
+ // TODO
+ break;
}
- return array;
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
index 3da68bf..c17729b 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
@@ -7,8 +7,6 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
@@ -42,7 +40,7 @@ public class NVMSpaceManager {
public void init() throws StartupException {
try {
String nvmDir = IoTDBDescriptor.getInstance().getConfig().getNvmDir();
- nvmFilePath = nvmDir + File.pathSeparatorChar + NVM_FILE_NAME;
+ nvmFilePath = nvmDir + File.separatorChar + NVM_FILE_NAME;
File nvmDirFile = FSFactoryProducer.getFSFactory().getFile(nvmDir);
nvmDirFile.mkdirs();
nvmSize = nvmDirFile.getUsableSpace();
@@ -59,45 +57,12 @@ public class NVMSpaceManager {
nvmFileChannel.close();
}
- public static int getPrimitiveTypeByteSize(TSDataType dataType) {
- int size = 0;
- switch (dataType) {
- case BOOLEAN:
- size = Byte.BYTES;
- break;
- case INT32:
- size = Integer.BYTES;
- break;
- case INT64:
- size = Long.BYTES;
- break;
- case FLOAT:
- size = Float.BYTES;
- break;
- case DOUBLE:
- size = Double.BYTES;
- break;
- case TEXT:
- // TODO
- break;
- default:
- throw new UnSupportedDataTypeException("DataType: " + dataType);
- }
- return size;
- }
-
public synchronized NVMSpace allocateSpace(long size) throws IOException {
NVMSpace nvmSpace = new NVMSpace(curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size));
curOffset += size;
return nvmSpace;
}
- public synchronized NVMStringSpace allocateStringSpace(long size) throws IOException {
- NVMStringSpace nvmSpace = new NVMStringSpace(curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size));
- curOffset += size;
- return nvmSpace;
- }
-
public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType dataType) {
checkIsFull();
@@ -107,7 +72,7 @@ public class NVMSpaceManager {
NVMDataSpace nvmSpace = new NVMDataSpace(
curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), index, dataType);
nvmSpace.refreshData();
- metadataManager.updateCount(index);
+ metadataManager.updateCount(curDataSpaceIndex.get());
curOffset += size;
return nvmSpace;
} catch (IOException e) {
@@ -122,29 +87,50 @@ public class NVMSpaceManager {
// TODO
}
- public List<NVMDataSpace> getAllNVMData() throws IOException {
- int spaceCount = metadataManager.getCount();
- List<NVMDataSpace> nvmDataList = new ArrayList<>(spaceCount);
- List<TSDataType> dataTypeList = metadataManager.getDataTypeList(spaceCount);
- long curOffset = 0;
- for (int i = 0; i < spaceCount; i++) {
- TSDataType dataType = dataTypeList.get(i);
- int spaceSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType) * ARRAY_SIZE;
- NVMDataSpace nvmDataSpace = recoverData(curOffset, spaceSize, i, dataType);
- nvmDataList.add(nvmDataSpace);
-
- curOffset += spaceSize;
- }
- return nvmDataList;
+ public NVMDataSpace getNVMDataSpaceByIndex(int spaceIndex) throws IOException {
+ long offset = metadataManager.getOffsetBySpaceIndex(spaceIndex);
+ TSDataType dataType = metadataManager.getDatatypeBySpaceIndex(spaceIndex);
+ int size = computeDataSpaceSizeByDataType(dataType);
+ return recoverData(offset, size, spaceIndex, dataType);
+ }
+
+ private int computeDataSpaceSizeByDataType(TSDataType dataType) {
+ return getPrimitiveTypeByteSize(dataType) * ARRAY_SIZE;
}
private synchronized NVMDataSpace recoverData(long offset, long size, int index, TSDataType dataType) throws IOException {
- NVMDataSpace nvmSpace = new NVMDataSpace(curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), index, dataType);
- curOffset += size;
+ NVMDataSpace nvmSpace = new NVMDataSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset, size), index, dataType);
return nvmSpace;
}
public static NVMSpaceManager getInstance() {
return INSTANCE;
}
+
+ public static int getPrimitiveTypeByteSize(TSDataType dataType) {
+ int size = 0;
+ switch (dataType) {
+ case BOOLEAN:
+ size = Byte.BYTES;
+ break;
+ case INT32:
+ size = Integer.BYTES;
+ break;
+ case INT64:
+ size = Long.BYTES;
+ break;
+ case FLOAT:
+ size = Float.BYTES;
+ break;
+ case DOUBLE:
+ size = Double.BYTES;
+ break;
+ case TEXT:
+ // TODO
+ break;
+ default:
+ throw new UnSupportedDataTypeException("DataType: " + dataType);
+ }
+ return size;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
index 93f3d99..871af82 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
@@ -3,46 +3,47 @@ package org.apache.iotdb.db.nvm.space;
import static org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSPACE_NUM_MAX;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import org.apache.iotdb.db.nvm.metadata.DataTypeMemo;
-import org.apache.iotdb.db.nvm.metadata.FreeSpaceBitMap;
+import org.apache.iotdb.db.nvm.metadata.OffsetMemo;
import org.apache.iotdb.db.nvm.metadata.SpaceCount;
-import org.apache.iotdb.db.nvm.metadata.TimeseriesTimeIndexMapper;
+import org.apache.iotdb.db.nvm.metadata.SpaceStatusBitMap;
import org.apache.iotdb.db.nvm.metadata.TimeValueMapper;
+import org.apache.iotdb.db.nvm.metadata.TimeseriesTimeIndexMapper;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
public class NVMSpaceMetadataManager {
private static final long SPACE_COUNT_FIELD_BYTE_SIZE = Integer.BYTES;
private static final long BITMAP_FIELD_BYTE_SIZE = Byte.BYTES * NVMSPACE_NUM_MAX;
+ private static final long OFFSET_FIELD_BYTE_SIZE = Long.BYTES * NVMSPACE_NUM_MAX;
private static final long DATATYPE_FIELD_BYTE_SIZE = Short.BYTES * NVMSPACE_NUM_MAX;
private static final long TVMAP_FIELD_BYTE_SIZE = NVMSpaceManager.getPrimitiveTypeByteSize(TSDataType.INT32) * NVMSPACE_NUM_MAX;
- private static final long TIMESERIES_FIELD_BYTE_SIZE = 0;
+ private static final long TSTIMEMAP_FIELD_BYTE_SIZE = NVMSpaceManager.getPrimitiveTypeByteSize(TSDataType.INT32) * 3 * NVMSPACE_NUM_MAX;
private final static NVMSpaceMetadataManager INSTANCE = new NVMSpaceMetadataManager();
private SpaceCount spaceCount;
- private FreeSpaceBitMap freeSpaceBitMap;
+ private SpaceStatusBitMap spaceStatusBitMap;
+ private OffsetMemo offsetMemo;
private DataTypeMemo dataTypeMemo;
private TimeValueMapper timeValueMapper;
private TimeseriesTimeIndexMapper timeseriesTimeIndexMapper;
- private NVMSpaceManager spaceManager = NVMSpaceManager.getInstance();
+ private NVMSpaceManager spaceManager;
private NVMSpaceMetadataManager() {}
public void init() throws IOException {
+ spaceManager = NVMSpaceManager.getInstance();
+
spaceCount = new SpaceCount(spaceManager.allocateSpace(SPACE_COUNT_FIELD_BYTE_SIZE));
- freeSpaceBitMap = new FreeSpaceBitMap(spaceManager.allocateSpace(BITMAP_FIELD_BYTE_SIZE));
+ spaceStatusBitMap = new SpaceStatusBitMap(spaceManager.allocateSpace(BITMAP_FIELD_BYTE_SIZE));
+ offsetMemo = new OffsetMemo(spaceManager.allocateSpace(OFFSET_FIELD_BYTE_SIZE));
dataTypeMemo = new DataTypeMemo(spaceManager.allocateSpace(DATATYPE_FIELD_BYTE_SIZE));
timeValueMapper = new TimeValueMapper(spaceManager.allocateSpace(TVMAP_FIELD_BYTE_SIZE));
- timeseriesTimeIndexMapper = new TimeseriesTimeIndexMapper(spaceManager.allocateSpace(TIMESERIES_FIELD_BYTE_SIZE));
+ timeseriesTimeIndexMapper = new TimeseriesTimeIndexMapper(spaceManager.allocateSpace(
+ TSTIMEMAP_FIELD_BYTE_SIZE));
}
public static NVMSpaceMetadataManager getInstance() {
@@ -60,8 +61,10 @@ public class NVMSpaceMetadataManager {
public void registerTVSpace(NVMDataSpace timeSpace, NVMDataSpace valueSpace, String sgId, String deviceId, String measurementId) {
int timeSpaceIndex = timeSpace.getIndex();
int valueSpaceIndex = valueSpace.getIndex();
- freeSpaceBitMap.update(timeSpaceIndex, false);
- freeSpaceBitMap.update(valueSpaceIndex, false);
+ spaceStatusBitMap.setUse(timeSpaceIndex, true);
+ spaceStatusBitMap.setUse(valueSpaceIndex, false);
+ offsetMemo.set(timeSpaceIndex, timeSpace.getOffset());
+ offsetMemo.set(valueSpaceIndex, valueSpace.getOffset());
dataTypeMemo.set(timeSpaceIndex, timeSpace.getDataType());
dataTypeMemo.set(valueSpaceIndex, valueSpace.getDataType());
@@ -71,26 +74,26 @@ public class NVMSpaceMetadataManager {
}
public void unregisterSpace(NVMDataSpace space) {
- freeSpaceBitMap.update(space.getIndex(), true);
+ spaceStatusBitMap.setFree(space.getIndex());
+ }
+
+ public List<Integer> getValidTimeSpaceIndexList() {
+ return spaceStatusBitMap.getValidTimeSpaceIndexList(getCount() / 2);
+ }
+
+ public int getValueSpaceIndexByTimeSpaceIndex(int timeSpaceIndex) {
+ return timeValueMapper.get(timeSpaceIndex);
+ }
+
+ public long getOffsetBySpaceIndex(int spaceIndex) {
+ return offsetMemo.get(spaceIndex);
}
- public Map<String, Map<String, Map<String, Pair<List<Integer>, List<Integer>>>>> getTimeseriesTVIndexMap() {
- Map<String, Map<String, Map<String, Pair<List<Integer>, List<Integer>>>>> tsTVIndexMap = new HashMap<>();
- List<Integer> validSpaceIndexList = freeSpaceBitMap.getValidSpaceIndexList();
- for (Integer timeSpaceIndex : validSpaceIndexList) {
- int valueSpaceIndex = timeValueMapper.get(timeSpaceIndex);
- String[] timeseries = timeseriesTimeIndexMapper.getTimeseries(timeSpaceIndex);
-
- Map<String, Map<String, Pair<List<Integer>, List<Integer>>>> deviceTVMap = tsTVIndexMap.computeIfAbsent(timeseries[0], k -> new HashMap<>());
- Map<String, Pair<List<Integer>, List<Integer>>> measurementTVMap = deviceTVMap.computeIfAbsent(timeseries[1], k -> new HashMap<>());
- Pair<List<Integer>, List<Integer>> tvPairList = measurementTVMap.computeIfAbsent(timeseries[2], k -> new Pair<>(new ArrayList<>(), new ArrayList<>()));
- tvPairList.left.add(timeSpaceIndex);
- tvPairList.right.add(valueSpaceIndex);
- }
- return tsTVIndexMap;
+ public TSDataType getDatatypeBySpaceIndex(int spaceIndex) {
+ return dataTypeMemo.get(spaceIndex);
}
- public List<TSDataType> getDataTypeList(int count) {
- return dataTypeMemo.getDataTypeList(count);
+ public String[] getTimeseriesBySpaceIndex(int spaceIndex) {
+ return timeseriesTimeIndexMapper.getTimeseries(spaceIndex);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMStringBuffer.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMStringBuffer.java
new file mode 100644
index 0000000..8400679
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMStringBuffer.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.db.nvm.space;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class NVMStringBuffer {
+
+ private List<String> existStringList;
+
+ private long size;
+ private NVMSpace count;
+ private NVMSpace lens;
+ private NVMSpace values;
+
+ public NVMStringBuffer(long size) throws IOException {
+ this.size = size;
+
+ NVMSpaceManager spaceManager = NVMSpaceManager.getInstance();
+ count = spaceManager.allocateSpace(NVMSpaceManager.getPrimitiveTypeByteSize(TSDataType.INT32));
+ lens = spaceManager.allocateSpace(size / 2);
+ values = spaceManager.allocateSpace(size / 2);
+
+ recover();
+ }
+
+ private void recover() {
+ int stringListLen = count.getByteBuffer().getInt(0);
+ existStringList = new ArrayList<>(stringListLen);
+ for (int i = 0; i < stringListLen; i++) {
+ int stringLen = lens.getByteBuffer().getInt(i);
+ byte[] bytes = new byte[stringLen];
+ values.getByteBuffer().get(bytes);
+ existStringList.add(new String(bytes));
+ }
+ }
+
+ public int put(String s) {
+ if (existStringList.contains(s)) {
+ return existStringList.indexOf(s);
+ } else {
+ existStringList.add(s);
+ serialize(s);
+ return existStringList.size() - 1;
+ }
+ }
+
+ private void serialize(String s) {
+ count.getByteBuffer().putInt(0, existStringList.size());
+ lens.getByteBuffer().putInt(existStringList.size() - 1, s.length());
+ values.getByteBuffer().put(s.getBytes());
+ }
+
+ public String get(int index) {
+ return existStringList.get(index);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMStringSpace.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMStringSpace.java
deleted file mode 100644
index d6c1fc0..0000000
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMStringSpace.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.db.nvm.space;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class NVMStringSpace extends NVMSpace {
-
- List<String> existStringList = new ArrayList<>();
-
- NVMStringSpace(long offset, long size, ByteBuffer byteBuffer) {
- super(offset, size, byteBuffer);
-
- // TODO recover set
- }
-
- public int put(String s) {
- // TODO
- }
-
- public String get(int index) {
- // TODO
- return null;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 3fc1cd8..2628ba1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -83,7 +83,11 @@ public class IoTDB implements IoTDBMBean {
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
setUncaughtExceptionHandler();
- NVMMemtableRecoverPerformer.getInstance().init();
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableNVM()) {
+ NVMSpaceManager.getInstance().init();
+ NVMMemtableRecoverPerformer.getInstance().init();
+ }
initMManager();
registerManager.register(StorageEngine.getInstance());
@@ -103,6 +107,10 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(MetricsService.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableNVM()) {
+ NVMMemtableRecoverPerformer.getInstance().close();
+ }
+
// When registering statMonitor, we should start recovering some statistics
// with latest values stored
// Warn: registMonitor() method should be called after systemDataRecovery()
@@ -110,8 +118,6 @@ public class IoTDB implements IoTDBMBean {
StatMonitor.getInstance().recovery();
}
- NVMSpaceManager.getInstance().init();
-
logger.info("IoTDB is set up.");
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index 8aa75dd..375960b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -49,7 +49,7 @@ public class MemTableFlushTaskTest {
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
EnvironmentUtils.envSetUp();
writer = new RestorableTsFileIOWriter(FSFactoryProducer.getFSFactory().getFile(filePath));
- memTable = new PrimitiveMemTable();
+ memTable = new PrimitiveMemTable(storageGroup);
}
@After
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
index ea51b07..8bf97d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
@@ -47,7 +47,7 @@ public class MemTablePoolTest {
public void testGetAndRelease() {
long time = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
- IMemTable memTable = MemTablePool.getInstance().getAvailableMemTable("test case", false);
+ IMemTable memTable = MemTablePool.getInstance().getAvailableMemTable("test case", false, "sg");
memTables.add(memTable);
}
time -= System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
index 6c03f2b..a6ba543 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
*/
public class MemtableBenchmark {
+ private static String sgId = "sg";
private static String deviceId = "d0";
private static int numOfMeasurement = 10000;
private static int numOfPoint = 1000;
@@ -39,7 +40,7 @@ public class MemtableBenchmark {
}
public static void main(String[] args) {
- IMemTable memTable = new PrimitiveMemTable();
+ IMemTable memTable = new PrimitiveMemTable(sgId);
final long startTime = System.currentTimeMillis();
// cpu not locality
for (int i = 0; i < numOfPoint; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 6ed8f5a..8893a58 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -63,7 +63,7 @@ public class PrimitiveMemTableTest {
@Test
public void simpleTest() {
- IMemTable memTable = new PrimitiveMemTable();
+ IMemTable memTable = new PrimitiveMemTable("sg");
int count = 10;
String deviceId = "d1";
String measurementId[] = new String[count];
@@ -129,7 +129,7 @@ public class PrimitiveMemTableTest {
@Test
public void testFloatType() {
- IMemTable memTable = new PrimitiveMemTable();
+ IMemTable memTable = new PrimitiveMemTable("sg");
String deviceId = "d1";
int size = 100;
write(memTable, deviceId, "s1", TSDataType.FLOAT, size);
@@ -137,7 +137,7 @@ public class PrimitiveMemTableTest {
@Test
public void testAllType() {
- IMemTable memTable = new PrimitiveMemTable();
+ IMemTable memTable = new PrimitiveMemTable("sg");
int count = 10;
String deviceId = "d1";
String measurementId[] = new String[count];
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index c1ef8a4..67108f5 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -69,7 +69,7 @@ public class LogReplayerTest {
}
};
TsFileResource tsFileResource = new TsFileResource(tsFile);
- IMemTable memTable = new PrimitiveMemTable();
+ IMemTable memTable = new PrimitiveMemTable("sg");
Schema schema = new Schema();
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index d513c83..53d43ea 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -135,7 +135,7 @@ public class SeqTsFileRecoverTest {
@Test
public void test() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
- versionController, resource, true);
+ versionController, resource, true, "sg");
ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
performer.recover();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 15bd301..a5c4095 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -145,7 +145,7 @@ public class UnseqTsFileRecoverTest {
@Test
public void test() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
- versionController, resource, true);
+ versionController, resource, true, "sg");
ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
performer.recover();