You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/02/28 06:57:16 UTC
[incubator-iotdb] 03/04: fix bugs
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch new_TsFile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3c7ea7bf2e77c5a6201fafa6a27f6fc33699191e
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Feb 28 14:30:01 2020 +0800
fix bugs
---
.../iotdb/db/engine/merge/task/MergeFileTask.java | 11 ++--
.../writelog/recover/TsFileRecoverPerformer.java | 16 ++---
.../engine/storagegroup/TsFileProcessorTest.java | 26 ++++----
.../db/integration/IoTDBFlushQueryMergeTest.java | 4 +-
.../iotdb/db/integration/IoTDBMergeTest.java | 1 -
.../reader/series/SeriesReaderByTimestampTest.java | 3 +-
.../query/reader/series/SeriesReaderTestUtil.java | 8 +--
.../iotdb/tsfile/file/metadata/TsFileMetaData.java | 6 ++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 8 +--
.../query/dataset/DataSetWithoutTimeGenerator.java | 2 +-
.../write/writer/RestorableTsFileIOWriter.java | 37 +++++------
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 71 ++++++++++------------
12 files changed, 93 insertions(+), 100 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index bdd41d5..bc8646b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -140,15 +140,14 @@ class MergeFileTask {
newFileWriter.close();
try (TsFileSequenceReader newFileReader =
new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
- List<List<ChunkMetaData>> chunkMetadataListInChunkGroups = newFileWriter
- .getChunkMetadataListInChunkGroup();
- List<String> devices = newFileWriter.getDeviceList();
+ Map<String, List<ChunkMetaData>> chunkMetadataListInChunkGroups =
+ newFileWriter.getDeviceChunkMetadataMap();
if (logger.isDebugEnabled()) {
logger.debug("{} find {} merged chunk groups", taskName, chunkMetadataListInChunkGroups.size());
}
- for (int i = 0; i < chunkMetadataListInChunkGroups.size(); i++) {
- List<ChunkMetaData> chunkMetaDataList = chunkMetadataListInChunkGroups.get(i);
- String deviceId = devices.get(i);
+ for (Map.Entry<String, List<ChunkMetaData>> entry : chunkMetadataListInChunkGroups.entrySet()) {
+ String deviceId = entry.getKey();
+ List<ChunkMetaData> chunkMetaDataList = entry.getValue();
writeMergedChunkGroup(chunkMetaDataList, deviceId, newFileReader, oldFileWriter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 6e095d9..e7a3c48 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -176,16 +176,16 @@ public class TsFileRecoverPerformer {
tsFileResource.serialize();
}
+
private void recoverResourceFromWriter(RestorableTsFileIOWriter restorableTsFileIOWriter) {
- List<String> deviceList = restorableTsFileIOWriter.getDeviceList();
- List<List<ChunkMetaData>> chunkMetaDataListInChunkGroup =
- restorableTsFileIOWriter.getChunkMetadataListInChunkGroup();
- for (int i = 0; i < deviceList.size(); i++) {
- List<ChunkMetaData> chunkMetaDataList = chunkMetaDataListInChunkGroup.get(i);
+ Map<String, List<ChunkMetaData>> deviceChunkMetaDataMap =
+ restorableTsFileIOWriter.getDeviceChunkMetadataMap();
+ for (Map.Entry<String, List<ChunkMetaData>> entry : deviceChunkMetaDataMap.entrySet()) {
+ String deviceId = entry.getKey();
+ List<ChunkMetaData> chunkMetaDataList = entry.getValue();
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
- tsFileResource
- .updateStartTime(deviceList.get(i), chunkMetaData.getStartTime());
- tsFileResource.updateEndTime(deviceList.get(i), chunkMetaData.getEndTime());
+ tsFileResource.updateStartTime(deviceId, chunkMetaData.getStartTime());
+ tsFileResource.updateEndTime(deviceId, chunkMetaData.getEndTime());
}
}
long fileVersion =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 6cc13ba..7e51cc6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -179,20 +179,24 @@ public class TsFileProcessorTest {
assertEquals(dataType, right.get(0).getDataType());
RestorableTsFileIOWriter tsFileIOWriter = processor.getWriter();
- List<List<ChunkMetaData>> chunkMetaDataListInChunkGroups =
- tsFileIOWriter.getChunkMetadataListInChunkGroup();
+ Map<String, List<ChunkMetaData>> chunkMetaDataListInChunkGroups =
+ tsFileIOWriter.getDeviceChunkMetadataMap();
RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(
SystemFileFactory.INSTANCE.getFile(filePath));
- List<List<ChunkMetaData>> restoredChunkMetaDataListInChunkGroups = restorableTsFileIOWriter
- .getChunkMetadataListInChunkGroup();
+ Map<String, List<ChunkMetaData>> restoredChunkMetaDataListInChunkGroups = restorableTsFileIOWriter
+ .getDeviceChunkMetadataMap();
assertEquals(chunkMetaDataListInChunkGroups.size(), restoredChunkMetaDataListInChunkGroups.size());
- for (int i = 0; i < chunkMetaDataListInChunkGroups.size(); i++) {
- List<ChunkMetaData> chunkMetaDataListInOneChunkGroup = chunkMetaDataListInChunkGroups.get(i);
- List<ChunkMetaData> chunkMetaDataListInOneChunkGroupRestore = restoredChunkMetaDataListInChunkGroups.get(i);
- for (int j = 0; j < chunkMetaDataListInOneChunkGroup.size(); j++) {
- ChunkMetaData chunkMetaData = chunkMetaDataListInOneChunkGroup.get(j);
- ChunkMetaData chunkMetaDataRestore = chunkMetaDataListInOneChunkGroupRestore.get(j);
- assertEquals(chunkMetaData, chunkMetaDataRestore);
+ for (Map.Entry<String, List<ChunkMetaData>> entry1
+ : chunkMetaDataListInChunkGroups.entrySet()) {
+ for (Map.Entry<String, List<ChunkMetaData>> entry2
+ : restoredChunkMetaDataListInChunkGroups.entrySet()) {
+ assertEquals(entry1.getKey(), entry2.getKey());
+ assertEquals(entry1.getValue().size(), entry2.getValue().size());
+ for (int i = 0; i < entry1.getValue().size(); i++) {
+ ChunkMetaData chunkMetaData = entry1.getValue().get(i);
+ ChunkMetaData chunkMetaDataRestore = entry2.getValue().get(i);
+ assertEquals(chunkMetaData, chunkMetaDataRestore);
+ }
}
}
restorableTsFileIOWriter.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
index 8a41c63..a2670f2 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
@@ -37,7 +37,7 @@ import org.junit.Test;
public class IoTDBFlushQueryMergeTest {
private static String[] sqls = new String[]{
- "SET STORAGE GROUP TO root.vehicle.d0",
+ "SET STORAGE GROUP TO root.vehicle",
"CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
"insert into root.vehicle.d0(timestamp,s0) values(1,101)",
@@ -111,7 +111,7 @@ public class IoTDBFlushQueryMergeTest {
Class.forName(Config.JDBC_DRIVER_NAME);
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
String insertTemplate =
- "INSERT INTO root.group%d(timestamp, s1, s2, s3) VALUES (%d, %d, %f, %s)";
+ "INSERT INTO root.group%d.d1(timestamp, s1, s2, s3) VALUES (%d, %d, %f, %s)";
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index daf0145..d72016b 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -94,7 +94,6 @@ public class IoTDBMergeTest {
long s1 = resultSet.getLong("root.mergeTest.d0.s1");
long s2 = resultSet.getLong("root.mergeTest.d0.s2");
long s3 = resultSet.getLong("root.mergeTest.d0.s3");
- System.out.println(time + " "+ s1 + " " + s2 + " " + s3);
if (i != 0) {
assertEquals(time + 10, s1);
assertEquals(time + 20, s2);
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index a0cdba0..19d3c8e 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -69,9 +69,8 @@ public class SeriesReaderByTimestampTest {
new Path(SERIES_READER_TEST_SG + PATH_SEPARATOR + "device0", "sensor0"),
TSDataType.INT32, new QueryContext(), dataSource, null);
- for (int time = 0; time < 500; time++) {
+ for (int time = 0; time < 100; time++) {
Integer value = (Integer) seriesReader.getValueInTimestamp(time);
-
if (time < 200) {
Assert.assertEquals(time + 20000, value.intValue());
} else if (time < 260 || (time >= 300 && time < 380) || (time >= 400)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index c3d5023..830cd6e 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -50,10 +50,10 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class SeriesReaderTestUtil {
- private static int seqFileNum = 5;
- private static int unseqFileNum = 5;
- private static int measurementNum = 10;
- private static int deviceNum = 10;
+ private static int seqFileNum = 1;
+ private static int unseqFileNum = 1;
+ private static int measurementNum = 1;
+ private static int deviceNum = 1;
private static long ptNum = 100;
private static long flushInterval = 20;
private static TSEncoding encoding = TSEncoding.PLAIN;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java
index a73e8dc..1a9a3a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java
@@ -48,6 +48,8 @@ public class TsFileMetaData {
private BloomFilter bloomFilter;
private Map<String, Pair<Long, Integer>> deviceMetaDataMap;
+
+ private Map<Long, Long> versionInfo;
public TsFileMetaData() {
}
@@ -175,4 +177,8 @@ public class TsFileMetaData {
this.deviceMetaDataMap = deviceMetaDataMap;
}
+ public void setVersionInfo(Map<Long, Long> versionInfo) {
+ this.versionInfo = versionInfo;
+ }
+
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 1180d26..a81a0b9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -687,10 +687,7 @@ public class TsFileSequenceReader implements AutoCloseable {
if (chunkMetadataListMap != null) {
for (ChunkMetaData chunk : chunks) {
Path path = new Path(deviceID, chunk.getMeasurementUid());
- List<ChunkMetaData> chunkMetaDataList = chunkMetadataListMap
- .getOrDefault(path, new ArrayList<>());
- chunkMetaDataList.add(chunk);
- chunkMetadataListMap.put(path, chunkMetaDataList);
+ chunkMetadataListMap.computeIfAbsent(path, k -> new ArrayList<>()).add(chunk);
}
}
endOffsetOfChunkGroup = this.position();
@@ -714,6 +711,7 @@ public class TsFileSequenceReader implements AutoCloseable {
// complete.
truncatedPosition = this.position() - 1;
} catch (Exception e2) {
+ e2.printStackTrace();
logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
file,
this.position(), e2.getMessage());
@@ -847,7 +845,7 @@ public class TsFileSequenceReader implements AutoCloseable {
* @param spacePartitionEndPos the end position of the space partition
* @return LocateStatus
*/
-
+ // TODO: This function is not correct.
private LocateStatus checkLocateStatus(long deviceMetadataOffset, int deviceMetadataLength,
long spacePartitionStartPos, long spacePartitionEndPos) {
long middleOffset = deviceMetadataOffset + deviceMetadataLength / 2;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java
index f602fa0..948bc6d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithoutTimeGenerator.java
@@ -106,7 +106,7 @@ public class DataSetWithoutTimeGenerator extends QueryDataSet {
Field field = new Field(dataTypes.get(i));
if (!hasDataRemaining.get(i)) {
- //record.addField(new Field(null));
+ record.addField(new Field(null));
continue;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 968d244..f9855f8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -57,6 +57,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
private boolean crashed;
/**
+ *
* all chunk group metadata which have been serialized on disk.
*/
private Map<String, Map<String, List<ChunkMetaData>>> metadatas = new HashMap<>();
@@ -181,19 +182,17 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
/**
- * add all appendChunkGroupMetadatas into memory. After calling this method, other classes can
+ * add all appendChunkMetadatas into memory. After calling this method, other classes can
* read these metadata.
*/
public void makeMetadataVisible() {
- Pair<List<String>, List<List<ChunkMetaData>>> append = getAppendedRowGroupMetadata();
- List<String> newlyFlushedDeviceList = append.left;
- List<List<ChunkMetaData>> newlyFlushedMetadataList = append.right;
+ List<Pair<String, List<ChunkMetaData>>> newlyFlushedMetadataList = getAppendedRowMetadata();
if (!newlyFlushedMetadataList.isEmpty()) {
- for (int i = 0; i < newlyFlushedMetadataList.size(); i++) {
- List<ChunkMetaData> rowGroupMetaData = newlyFlushedMetadataList.get(i);
- String deviceId = newlyFlushedDeviceList.get(i);
- for (ChunkMetaData chunkMetaData : rowGroupMetaData) {
+ for (Pair<String, List<ChunkMetaData>> pair : newlyFlushedMetadataList) {
+ List<ChunkMetaData> rowMetaDataList = pair.right;
+ String deviceId = pair.left;
+ for (ChunkMetaData chunkMetaData : rowMetaDataList) {
String measurementId = chunkMetaData.getMeasurementUid();
if (!metadatas.containsKey(deviceId)) {
metadatas.put(deviceId, new HashMap<>());
@@ -212,24 +211,18 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
/**
- * get all the chunkGroups' metadata which are appended after the last calling of this method, or
+ * get all the chunk's metadata which are appended after the last calling of this method, or
* after the class instance is initialized if this is the first time to call the method.
*
- * @return a list of ChunkMetadataList
+ * @return a list of Device ChunkMetadataList Pair
*/
- private Pair<List<String>, List<List<ChunkMetaData>>> getAppendedRowGroupMetadata() {
- List<String> appendDevices = new ArrayList<>();
- List<List<ChunkMetaData>> appendChunkGroupMetaDataList = new ArrayList<>();
- if (lastFlushedChunkGroupIndex < chunkGroupMetaDataList.size()) {
- appendDevices
- .addAll(deviceList.subList(lastFlushedChunkGroupIndex, chunkGroupMetaDataList.size()));
- appendChunkGroupMetaDataList.addAll(chunkGroupMetaDataList
- .subList(lastFlushedChunkGroupIndex, chunkGroupMetaDataList.size()));
- lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
+ private List<Pair<String, List<ChunkMetaData>>> getAppendedRowMetadata() {
+ List<Pair<String, List<ChunkMetaData>>> append = new ArrayList<>();
+ if (lastFlushedChunkGroupIndex < chunkGroupInfoList.size()) {
+ append.addAll(chunkGroupInfoList
+ .subList(lastFlushedChunkGroupIndex, chunkGroupInfoList.size()));
+ lastFlushedChunkGroupIndex = chunkGroupInfoList.size();
}
- Pair<List<String>, List<List<ChunkMetaData>>> append =
- new Pair<List<String>, List<List<ChunkMetaData>>>(appendDevices,
- appendChunkGroupMetaDataList);
return append;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 29ae5d2..3cc9bd8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -18,15 +18,6 @@
*/
package org.apache.iotdb.tsfile.write.writer;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -45,11 +36,19 @@ import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
/**
* TSFileIOWriter is used to construct metadata and write data stored in memory to output stream.
*/
@@ -66,19 +65,18 @@ public class TsFileIOWriter {
}
protected TsFileOutput out;
+ protected List<Pair<String, List<ChunkMetaData>>> chunkGroupInfoList = new ArrayList<>();
protected boolean canWrite = true;
protected int totalChunkNum = 0;
protected int invalidChunkNum;
protected File file;
- protected List<List<ChunkMetaData>> chunkGroupMetaDataList = new ArrayList<>();
- protected List<String> deviceList = new ArrayList<>();
protected List<ChunkMetaData> chunkMetaDataList = new ArrayList<>();
protected Map<Path, List<ChunkMetaData>> chunkMetadataListMap = new TreeMap<>();
private ChunkMetaData currentChunkMetaData;
private long markedPosition;
- private Map<String, Pair<Long, Integer>> deviceMetaDataMap;
private String deviceId;
private long currentChunkGroupStartOffset;
+ private Map<Long, Long> versionInfo = new HashMap<>();
/**
* empty construct function.
@@ -151,8 +149,8 @@ public class TsFileIOWriter {
ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(deviceId, dataSize,
chunkMetaDataList.size());
chunkGroupFooter.serializeTo(out.wrapAsStream());
- chunkGroupMetaDataList.add(chunkMetaDataList);
- deviceList.add(deviceId);
+ chunkGroupInfoList.add(new Pair<>(deviceId, chunkMetaDataList));
+ versionInfo.put(out.getPosition(), version);
logger.debug("end chunk group:{}", chunkMetaDataList);
deviceId = null;
chunkMetaDataList = null;
@@ -231,10 +229,11 @@ public class TsFileIOWriter {
logger.debug("get time series list:{}", chunkMetadataListMap.keySet());
- deviceMetaDataMap = flushAllChunkMetadataList();
+ Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList();
TsFileMetaData tsFileMetaData = new TsFileMetaData();
tsFileMetaData.setDeviceMetaDataMap(deviceMetaDataMap);
+ tsFileMetaData.setVersionInfo(versionInfo);
tsFileMetaData.setTotalChunkNum(totalChunkNum);
tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
@@ -277,12 +276,10 @@ public class TsFileIOWriter {
// convert ChunkMetadataList to this field
Map<String, List<TimeseriesMetaData>> deviceTimeseriesMetadataMap = new LinkedHashMap<>();
+ // create device -> TimeseriesMetaDataList Map
for (Map.Entry<Path, List<ChunkMetaData>> entry : chunkMetadataListMap.entrySet()) {
Path path = entry.getKey();
String deviceId = path.getDevice();
- // create device -> TimeseriesMetaDataList Map
- List<TimeseriesMetaData> timeseriesMetadataList = deviceTimeseriesMetadataMap
- .getOrDefault(deviceId, new ArrayList<>());
// create TimeseriesMetaData
TimeseriesMetaData timeseriesMetaData = new TimeseriesMetaData();
timeseriesMetaData.setMeasurementId(path.getMeasurement());
@@ -297,8 +294,8 @@ public class TsFileIOWriter {
}
timeseriesMetaData.setStatistics(statistics);
timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetadataListLength);
- timeseriesMetadataList.add(timeseriesMetaData);
- deviceTimeseriesMetadataMap.put(deviceId, timeseriesMetadataList);
+ deviceTimeseriesMetadataMap.computeIfAbsent(deviceId, k -> new ArrayList<>())
+ .add(timeseriesMetaData);
}
// create DeviceMetaDataMap device -> Pair<TimeseriesMetaDataOffset, TimeseriesMetaDataLength>
Map<String, Pair<Long, Integer>> deviceMetadataMap = new HashMap<>();
@@ -328,12 +325,15 @@ public class TsFileIOWriter {
return out.getPosition();
}
- public List<List<ChunkMetaData>> getChunkMetadataListInChunkGroup() {
- return chunkGroupMetaDataList;
- }
-
- public List<String> getDeviceList() {
- return deviceList;
+ public Map<String, List<ChunkMetaData>> getDeviceChunkMetadataMap() {
+ Map<String, List<ChunkMetaData>> deviceChunkMetadataMap = new HashMap<>();
+ for (Map.Entry<Path, List<ChunkMetaData>> entry : chunkMetadataListMap.entrySet()) {
+ Path path = entry.getKey();
+ String deviceId = path.getDevice();
+ deviceChunkMetadataMap.computeIfAbsent(deviceId, k -> new ArrayList<>())
+ .addAll(entry.getValue());
+ }
+ return deviceChunkMetadataMap;
}
public boolean canWrite() {
@@ -384,23 +384,18 @@ public class TsFileIOWriter {
public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
Map<Path, Integer> startTimeIdxes = new HashMap<>();
chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
- Iterator<String> devicesIterator = deviceList.iterator();
- Iterator<List<ChunkMetaData>> chunkGroupMetaDataIterator = chunkGroupMetaDataList.iterator();
- while (devicesIterator.hasNext() && chunkGroupMetaDataIterator.hasNext()) {
- List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaDataIterator.next();
- String deviceId = devicesIterator.next();
+ for (Map.Entry<Path, List<ChunkMetaData>> entry : chunkMetadataListMap.entrySet()) {
+ List<ChunkMetaData> chunkMetaDataList = entry.getValue();
+ Path path = entry.getKey();
int chunkNum = chunkMetaDataList.size();
- Iterator<ChunkMetaData> chunkMetaDataIterator = chunkMetaDataList.iterator();
- while (chunkMetaDataIterator.hasNext()) {
- ChunkMetaData chunkMetaData = chunkMetaDataIterator.next();
- Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
+ for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
int startTimeIdx = startTimeIdxes.get(path);
List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
&& pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
if (!chunkValid) {
- chunkMetaDataIterator.remove();
+ chunkMetaDataList.remove(chunkMetaData);
chunkNum--;
invalidChunkNum++;
} else {
@@ -408,7 +403,7 @@ public class TsFileIOWriter {
}
}
if (chunkNum == 0) {
- chunkGroupMetaDataIterator.remove();
+ chunkMetadataListMap.remove(path);
}
}
}