You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/01 06:31:07 UTC
[incubator-iotdb] branch master updated: Fix visible metadata,
version, work processor bugs when recovering (#966)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6c23a3 Fix visible metadata, version, work processor bugs when recovering (#966)
c6c23a3 is described below
commit c6c23a3c88145fe9f4615b6987ce6842f51b91e3
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Wed Apr 1 14:31:01 2020 +0800
Fix visible metadata, version, work processor bugs when recovering (#966)
---
.../main/java/org/apache/iotdb/SessionExample.java | 1 -
.../org/apache/iotdb/db/engine/StorageEngine.java | 3 +-
.../engine/storagegroup/StorageGroupProcessor.java | 95 +++++-----------------
.../db/engine/storagegroup/TsFileResource.java | 49 +++++++++++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 55 ++++++-------
.../org/apache/iotdb/db/utils/FilePathUtils.java | 1 +
.../writelog/recover/TsFileRecoverPerformer.java | 7 +-
...yMergeTest.java => IoTDBFlushQueryMergeIT.java} | 2 +-
...ileTest.java => IoTDBLoadExternalTsfileIT.java} | 4 +-
.../iotdb/db/integration/IoTDBRestartIT.java | 87 ++++++++++++++++++++
...impleQueryTest.java => IoTDBSimpleQueryIT.java} | 2 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 5 ++
.../db/writelog/recover/SeqTsFileRecoverTest.java | 4 +
.../tsfile/file/metadata/ChunkGroupMetadata.java | 44 ++++++++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 70 ++++++----------
.../write/writer/RestorableTsFileIOWriter.java | 43 +++++-----
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 90 +++++++++++---------
17 files changed, 342 insertions(+), 220 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 33e3090..70b5411 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 3ef7e0a..dc8a3b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -498,8 +498,7 @@ public class StorageEngine implements IService {
if (!sequenceFile.isClosed()) {
continue;
}
- String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
- long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
+ long partitionNum = sequenceFile.getTimePartition();
Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
, n -> new HashMap<>());
storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e006118..0ce93bd 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -62,10 +62,8 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.LoadEmptyFileException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
@@ -81,7 +79,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
-import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.rpc.RpcUtils;
@@ -262,8 +259,7 @@ public class StorageGroupProcessor {
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
- String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
- long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+ long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
for (TsFileResource resource : unseqTsFiles) {
@@ -271,8 +267,7 @@ public class StorageGroupProcessor {
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
- String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
- long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+ long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
@@ -296,30 +291,16 @@ public class StorageGroupProcessor {
}
for (TsFileResource resource : sequenceFileTreeSet) {
- long timePartitionId = getTimePartitionFromTsFileResource(resource);
- if (timePartitionId != -1) {
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
- .putAll(resource.getEndTimeMap());
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .putAll(resource.getEndTimeMap());
- globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
- }
+ long timePartitionId = resource.getTimePartition();
+ latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
+ .putAll(resource.getEndTimeMap());
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .putAll(resource.getEndTimeMap());
+ globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
}
- private long getTimePartitionFromTsFileResource(TsFileResource resource) {
- // device id -> start map
- // if start time map is empty, tsfile resource is empty, return -1;
- Map<String, Long> startTimeMap = resource.getStartTimeMap();
- // just find any time of device
- Iterator<Long> iterator = startTimeMap.values().iterator();
- if (iterator.hasNext()) {
- return StorageEngine.getTimePartition(iterator.next());
- }
-
- return -1;
- }
/**
* get version controller by time partition Id Thread-safety should be ensure by caller
@@ -393,7 +374,7 @@ public class StorageGroupProcessor {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
sequenceFileTreeSet.add(tsFileResource);
- long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+ long timePartitionId = tsFileResource.getTimePartition();
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
@@ -408,7 +389,7 @@ public class StorageGroupProcessor {
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, writer);
- workUnsequenceTsFileProcessors
+ workSequenceTsFileProcessors
.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -422,7 +403,7 @@ public class StorageGroupProcessor {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
unSequenceFileList.add(tsFileResource);
- long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+ long timePartitionId = tsFileResource.getTimePartition();
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
@@ -437,6 +418,8 @@ public class StorageGroupProcessor {
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, writer);
+ workUnsequenceTsFileProcessors
+ .put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
@@ -1217,7 +1200,7 @@ public class StorageGroupProcessor {
continue;
}
- long partitionId = getTimePartitionFromTsFileResource(tsFileResource);
+ long partitionId = tsFileResource.getTimePartition();
deletion.setVersionNum(getVersionControllerByTimePartitionId(partitionId).nextVersion());
// write deletion into modification file
@@ -1508,7 +1491,7 @@ public class StorageGroupProcessor {
*/
public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
- long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
+ long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock();
mergeLock.writeLock().lock();
try {
@@ -1544,7 +1527,7 @@ public class StorageGroupProcessor {
*/
public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
- long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
+ long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock();
mergeLock.writeLock().lock();
try {
@@ -1564,7 +1547,7 @@ public class StorageGroupProcessor {
// check whether the file name needs to be renamed.
if (!sequenceFileTreeSet.isEmpty()) {
String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
- getTimePartitionFromTsFileResource(newTsFileResource), sequenceList);
+ newTsFileResource.getTimePartition(), sequenceList);
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
@@ -1577,8 +1560,7 @@ public class StorageGroupProcessor {
// update latest time map
updateLatestTimeMap(newTsFileResource);
- String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource);
- long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+ long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
} catch (DiskSpaceInsufficientException e) {
@@ -1594,40 +1576,6 @@ public class StorageGroupProcessor {
}
/**
- * Check and get the partition id of a TsFile to be inserted using the start times and end
- * times of devices.
- * TODO: when the partition violation happens, split the file and load into different partitions
- * @throws LoadFileException if the data of the file cross partitions or it is empty
- */
- private long getNewFilePartitionId(TsFileResource resource) throws LoadFileException {
- long partitionId = -1;
- for (Long startTime : resource.getStartTimeMap().values()) {
- long p = StorageEngine.getTimePartition(startTime);
- if (partitionId == -1) {
- partitionId = p;
- } else {
- if (partitionId != p) {
- throw new PartitionViolationException(resource);
- }
- }
- }
- for (Long endTime : resource.getEndTimeMap().values()) {
- long p = StorageEngine.getTimePartition(endTime);
- if (partitionId == -1) {
- partitionId = p;
- } else {
- if (partitionId != p) {
- throw new PartitionViolationException(resource);
- }
- }
- }
- if (partitionId == -1) {
- throw new LoadEmptyFileException();
- }
- return partitionId;
- }
-
- /**
* Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
* @param newTsFileResource
* @param newFilePartitionId
@@ -1715,15 +1663,10 @@ public class StorageGroupProcessor {
/**
* If the historical versions of a file is a sub-set of the given file's, remove it to reduce
-<<<<<<< HEAD
- * unnecessary merge. Only used when the file sender and the receiver share the same file close
- * policy.
-=======
* unnecessary merge. Only used when the file sender and the receiver share the same file
* close policy.
* Warning: DO NOT REMOVE
* @param resource
->>>>>>> master
*/
@SuppressWarnings("unused")
public void removeFullyOverlapFiles(TsFileResource resource) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ce2d7da..ce20b85 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -20,10 +20,13 @@ package org.apache.iotdb.db.engine.storagegroup;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
+import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.service.UpgradeSevice;
+import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -471,4 +474,50 @@ public class TsFileResource {
public TimeseriesMetadata getTimeSeriesMetadata() {
return timeSeriesMetadata;
}
+
+ /**
+ * make sure Either the startTimeMap is not empty
+ * Or the path contains a partition folder
+ */
+ public long getTimePartition() {
+ if (startTimeMap != null && !startTimeMap.isEmpty()) {
+ return StorageEngine.getTimePartition(startTimeMap.values().iterator().next());
+ }
+ String[] splits = FilePathUtils.splitTsFilePath(this);
+ return Long.parseLong(splits[splits.length - 2]);
+ }
+
+ /**
+ * Used when load new TsFiles not generated by the server
+ * Check and get the time partition
+ * TODO: when the partition violation happens, split the file and load into different partitions
+ * @throws PartitionViolationException if the data of the file cross partitions or it is empty
+ */
+ public long getTimePartitionWithCheck() throws PartitionViolationException {
+ long partitionId = -1;
+ for (Long startTime : startTimeMap.values()) {
+ long p = StorageEngine.getTimePartition(startTime);
+ if (partitionId == -1) {
+ partitionId = p;
+ } else {
+ if (partitionId != p) {
+ throw new PartitionViolationException(this);
+ }
+ }
+ }
+ for (Long endTime : endTimeMap.values()) {
+ long p = StorageEngine.getTimePartition(endTime);
+ if (partitionId == -1) {
+ partitionId = p;
+ } else {
+ if (partitionId != p) {
+ throw new PartitionViolationException(this);
+ }
+ }
+ }
+ if (partitionId == -1) {
+ throw new PartitionViolationException(this);
+ }
+ return partitionId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index f2ef3ec..de6bfc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -113,6 +112,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -571,9 +571,10 @@ public class PlanExecutor implements IPlanExecutor {
file.getAbsolutePath()));
}
Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
- Map<Path, List<ChunkMetadata>> chunkMetaDataListMap = new HashMap<>();
+
+ List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- reader.selfCheck(schemaMap, chunkMetaDataListMap, false);
+ reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
}
FileLoaderUtils.checkTsFileResource(tsFileResource);
@@ -586,7 +587,7 @@ public class PlanExecutor implements IPlanExecutor {
//create schemas if they doesn't exist
if (plan.isAutoCreateSchema()) {
- createSchemaAutomatically(chunkMetaDataListMap, schemaMap, plan.getSgLevel());
+ createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
}
StorageEngine.getInstance().loadNewTsFile(tsFileResource);
@@ -596,36 +597,34 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- private void createSchemaAutomatically(Map<Path, List<ChunkMetadata>> chunkMetaDataListMap,
+ private void createSchemaAutomatically(
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
Map<Path, MeasurementSchema> knownSchemas, int sgLevel)
throws QueryProcessException, MetadataException {
- if (chunkMetaDataListMap.isEmpty()) {
+ if (chunkGroupMetadataList.isEmpty()) {
return;
}
- for (Entry<Path, List<ChunkMetadata>> entry : chunkMetaDataListMap.entrySet()) {
- String device = entry.getKey().getDevice();
+
+ Set<Path> registeredSeries = new HashSet<>();
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ String device = chunkGroupMetadata.getDevice();
MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel);
- for (ChunkMetadata chunkMetaData : entry.getValue()) {
- String measurement = chunkMetaData.getMeasurementUid();
- String fullPath = device + IoTDBConstant.PATH_SEPARATOR + measurement;
- MeasurementSchema schema = knownSchemas.get(entry.getKey());
- if (schema == null) {
- throw new MetadataException(String
- .format("Can not get the schema of measurement [%s]", measurement));
- }
- if (!node.hasChild(measurement)) {
- try {
- mManager.createTimeseries(fullPath, schema.getType(), schema.getEncodingType(),
- schema.getCompressor(), Collections.emptyMap());
- } catch (MetadataException e) {
- if (!e.getMessage().contains("already exist")) {
- throw e;
- }
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ if (!registeredSeries.contains(series)) {
+ registeredSeries.add(series);
+ MeasurementSchema schema = knownSchemas.get(series);
+ if (schema == null) {
+ throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
+ chunkMetadata.getMeasurementUid()));
+ }
+ if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
+ mManager.createTimeseries(series.getFullPath(), schema.getType(),
+ schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
+ } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
+ throw new QueryProcessException(
+ String.format("Current Path is not leaf node. %s", series));
}
- }
- if (node.getChild(measurement) instanceof InternalMNode) {
- throw new QueryProcessException(
- String.format("Current Path is not leaf node. %s", fullPath));
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index d89e6fb..5e9297a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -45,4 +45,5 @@ public class FilePathUtils {
public static String[] splitTsFilePath(TsFileResource resource) {
return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING);
}
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index ad22e19..f126c78 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -58,7 +58,6 @@ public class TsFileRecoverPerformer {
private String insertFilePath;
private String logNodePrefix;
private VersionController versionController;
- private LogReplayer logReplayer;
private TsFileResource resource;
private boolean acceptUnseq;
private boolean isLastFile;
@@ -81,9 +80,6 @@ public class TsFileRecoverPerformer {
*/
public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
- IMemTable recoverMemTable = new PrimitiveMemTable();
- this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
- versionController, resource, recoverMemTable, acceptUnseq);
File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
if (!insertFile.exists()) {
logger.error("TsFile {} is missing, will skip its recovery.", insertFilePath);
@@ -191,13 +187,12 @@ public class TsFileRecoverPerformer {
private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
throws StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
- this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
+ LogReplayer logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
versionController, resource, recoverMemTable, acceptUnseq);
logReplayer.replayLogs();
try {
if (!recoverMemTable.isEmpty()) {
// flush logs
-
MemTableFlushTask tableFlushTask = new MemTableFlushTask(recoverMemTable,
restorableTsFileIOWriter, resource.getFile().getParentFile().getParentFile().getName());
tableFlushTask.syncFlushMemTable();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
rename to server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
index bdac19b..7acd7a8 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
@@ -34,7 +34,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-public class IoTDBFlushQueryMergeTest {
+public class IoTDBFlushQueryMergeIT {
private static String[] sqls = new String[]{
"SET STORAGE GROUP TO root.vehicle.d0",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
rename to server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 3618e2c..ec9092a 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -42,9 +42,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class IoTDBLoadExternalTsfileTest {
+public class IoTDBLoadExternalTsfileIT {
- private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLoadExternalTsfileTest.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLoadExternalTsfileIT.class);
private static String[] insertSequenceSqls = new String[]{
"SET STORAGE GROUP TO root.vehicle",
"SET STORAGE GROUP TO root.test",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
new file mode 100644
index 0000000..931ecb2
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.Test;
+
+public class IoTDBRestartIT {
+
+ @Test
+ public void testRestart()
+ throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+
+ try(Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("insert into root.turbine.d1(timestamp,s1) values(1,1)");
+ statement.execute("flush");
+ }
+
+ EnvironmentUtils.restartDaemon();
+
+ try(Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("insert into root.turbine.d1(timestamp,s1) values(2,1)");
+ }
+
+ EnvironmentUtils.restartDaemon();
+
+ try(Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("insert into root.turbine.d1(timestamp,s1) values(3,1)");
+
+ boolean hasResultSet = statement.execute("SELECT s1 FROM root.turbine.d1");
+ assertTrue(hasResultSet);
+ String[] exp = new String[]{
+ "1,1",
+ "2,1",
+ "3,1"
+ };
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String result = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2);
+ assertEquals(exp[cnt], result);
+ cnt++;
+ }
+ }
+
+ EnvironmentUtils.cleanEnv();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java
rename to server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 9c05af0..0425390 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -32,7 +32,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-public class IoTDBSimpleQueryTest {
+public class IoTDBSimpleQueryIT {
@Before
public void setUp() throws Exception {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 716e14b..37162b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -231,6 +231,11 @@ public class EnvironmentUtils {
}
}
+ public static void restartDaemon() {
+ stopDaemon();
+ reactiveDaemon();
+ }
+
private static void createAllDir() {
// create sequential files
for (String path : directoryManager.getAllSequenceFileFolders()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 8480e4a..3c2a38b 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -215,6 +215,10 @@ public class SeqTsFileRecoverTest {
versionController, resource, true, true);
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
RestorableTsFileIOWriter writer = performer.recover();
+
+ writer.makeMetadataVisible();
+ assertEquals(11, writer.getMetadatasForQuery().size());
+
assertTrue(writer.canWrite());
writer.endFile();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetadata.java
new file mode 100644
index 0000000..d01f041
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetadata.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata;
+
+import java.util.List;
+
+/**
+ * Only maintained when writing, not serialized to TsFile
+ */
+public class ChunkGroupMetadata {
+
+ private String device;
+
+ private List<ChunkMetadata> chunkMetadataList;
+
+ public ChunkGroupMetadata(String device, List<ChunkMetadata> chunkMetadataList) {
+ this.device = device;
+ this.chunkMetadataList = chunkMetadataList;
+ }
+
+ public String getDevice() {
+ return device;
+ }
+
+ public List<ChunkMetadata> getChunkMetadataList() {
+ return chunkMetadataList;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 93354a6..b7122b8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
@@ -561,10 +562,8 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Self Check the file and return the position before where the data is safe.
*
- * @param newSchema @OUT. the measurement schema in the file will be added into this parameter.
- * (can be null)
- * @param chunkMetadataListMap @OUT. the treeMap (Path -> ChunkmetadataList)
- * (can be null)
+ * @param newSchema the schema on each time series in the file
+ * @param chunkGroupMetadataList ChunkGroupMetadata List
* @param fastFinish if true and the file is complete, then newSchema and newMetaData parameter
* will be not modified.
* @return the position of the file that is fine. All data after the position in the file should
@@ -572,8 +571,7 @@ public class TsFileSequenceReader implements AutoCloseable {
*/
public long selfCheck(Map<Path, MeasurementSchema> newSchema,
- Map<Path, List<ChunkMetadata>> chunkMetadataListMap,
- boolean fastFinish) throws IOException {
+ List<ChunkGroupMetadata> chunkGroupMetadataList, boolean fastFinish) throws IOException {
File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
long fileSize;
if (!checkFile.exists()) {
@@ -586,7 +584,8 @@ public class TsFileSequenceReader implements AutoCloseable {
TSDataType dataType;
long fileOffsetOfChunk;
- List<ChunkMetadata> chunks = null;
+ // ChunkMetadata of current ChunkGroup
+ List<ChunkMetadata> chunkMetadataList = null;
String deviceID;
int position = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
@@ -611,56 +610,44 @@ public class TsFileSequenceReader implements AutoCloseable {
boolean newChunkGroup = true;
// not a complete file, we will recover it...
long truncatedPosition = TSFileConfig.MAGIC_STRING.getBytes().length;
- boolean goon = true;
byte marker;
int chunkCnt = 0;
List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
try {
- while (goon && (marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+ while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
// this is the first chunk of a new ChunkGroup.
if (newChunkGroup) {
newChunkGroup = false;
- chunks = new ArrayList<>();
+ chunkMetadataList = new ArrayList<>();
}
fileOffsetOfChunk = this.position() - 1;
// if there is something wrong with a chunk, we will drop the whole ChunkGroup
// as different chunks may be created by the same insertions(sqls), and partial
// insertion is not tolerable
- ChunkHeader header = this.readChunkHeader();
- measurementID = header.getMeasurementID();
+ ChunkHeader chunkHeader = this.readChunkHeader();
+ measurementID = chunkHeader.getMeasurementID();
MeasurementSchema measurementSchema = new MeasurementSchema(measurementID,
- header.getDataType(),
- header.getEncodingType(), header.getCompressionType());
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(), chunkHeader.getCompressionType());
measurementSchemaList.add(measurementSchema);
- dataType = header.getDataType();
+ dataType = chunkHeader.getDataType();
Statistics<?> chunkStatistics = Statistics.getStatsByType(dataType);
- if (header.getNumOfPages() > 0) {
- PageHeader pageHeader = this.readPageHeader(header.getDataType());
- chunkStatistics.mergeStatistics(pageHeader.getStatistics());
- this.skipPageData(pageHeader);
- }
- for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+ for (int j = 0; j < chunkHeader.getNumOfPages(); j++) {
// a new Page
- PageHeader pageHeader = this.readPageHeader(header.getDataType());
- chunkStatistics.mergeStatistics(pageHeader.getStatistics());
- this.skipPageData(pageHeader);
- }
- if (header.getNumOfPages() > 1) {
- PageHeader pageHeader = this.readPageHeader(header.getDataType());
+ PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType());
chunkStatistics.mergeStatistics(pageHeader.getStatistics());
this.skipPageData(pageHeader);
}
currentChunk = new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk,
chunkStatistics);
- chunks.add(currentChunk);
+ chunkMetadataList.add(currentChunk);
chunkCnt++;
break;
case MetaMarker.CHUNK_GROUP_FOOTER:
// this is a chunk group
- // if there is something wrong with the ChunkGroup Footer, we will drop this
- // ChunkGroup
+ // if there is something wrong with the ChunkGroup Footer, we will drop this ChunkGroup
// because we can not guarantee the correctness of the deviceId.
ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
deviceID = chunkGroupFooter.getDeviceID();
@@ -669,12 +656,7 @@ public class TsFileSequenceReader implements AutoCloseable {
newSchema.putIfAbsent(new Path(deviceID, tsSchema.getMeasurementId()), tsSchema);
}
}
- if (chunkMetadataListMap != null) {
- for (ChunkMetadata chunk : chunks) {
- Path path = new Path(deviceID, chunk.getMeasurementUid());
- chunkMetadataListMap.computeIfAbsent(path, k -> new ArrayList<>()).add(chunk);
- }
- }
+ chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList));
newChunkGroup = true;
truncatedPosition = this.position();
@@ -684,23 +666,17 @@ public class TsFileSequenceReader implements AutoCloseable {
break;
default:
// the disk file is corrupted, using this file may be dangerous
- MetaMarker.handleUnexpectedMarker(marker);
- goon = false;
- logger.error(String
- .format("Unrecognized marker detected, this file {%s} may be corrupted", file));
+ throw new IOException("Unexpected marker " + marker);
}
}
// now we read the tail of the data section, so we are sure that the last
- // ChunkGroupFooter is
- // complete.
+ // ChunkGroupFooter is complete.
truncatedPosition = this.position() - 1;
- } catch (Exception e2) {
+ } catch (Exception e) {
logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
- file,
- this.position(), e2.getMessage());
+ file, this.position(), e.getMessage());
}
- // Despite the completeness of the data section, we will discard current
- // FileMetadata
+ // Despite the completeness of the data section, we will discard current FileMetadata
// so that we can continue to write data into this tsfile.
return truncatedPosition;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index c1fa885..bef6bb2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -59,7 +60,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
/**
* all chunk group metadata which have been serialized on disk.
*/
- private Map<String, Map<String, List<ChunkMetadata>>> metadatas = new HashMap<>();
+ private Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = new HashMap<>();
/**
* @param file a given tsfile path you want to (continue to) write
@@ -90,7 +91,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
// uncompleted file
- truncatedPosition = reader.selfCheck(knownSchemas, chunkMetadataListMap, true);
+ truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
totalChunkNum = reader.getTotalChunkNum();
if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
@@ -168,8 +169,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String measurementId,
TSDataType dataType) {
List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
- if (metadatas.containsKey(deviceId) && metadatas.get(deviceId).containsKey(measurementId)) {
- for (ChunkMetadata chunkMetaData : metadatas.get(deviceId).get(measurementId)) {
+ if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId).containsKey(measurementId)) {
+ for (ChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) {
// filter: if adevice'sensor is defined as float type, and data has been persistent.
// Then someone deletes the timeseries and recreate it with Int type. We have to ignore
// all the stale data.
@@ -181,26 +182,30 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
return chunkMetadataList;
}
+ public Map<String, Map<String, List<ChunkMetadata>>> getMetadatasForQuery() {
+ return metadatasForQuery;
+ }
+
/**
* add all appendChunkMetadatas into memory. After calling this method, other classes can
* read these metadata.
*/
public void makeMetadataVisible() {
- List<Pair<String, List<ChunkMetadata>>> newlyFlushedMetadataList = getAppendedRowMetadata();
+ List<ChunkGroupMetadata> newlyFlushedMetadataList = getAppendedRowMetadata();
if (!newlyFlushedMetadataList.isEmpty()) {
- for (Pair<String, List<ChunkMetadata>> pair : newlyFlushedMetadataList) {
- List<ChunkMetadata> rowMetaDataList = pair.right;
- String deviceId = pair.left;
+ for (ChunkGroupMetadata chunkGroupMetadata : newlyFlushedMetadataList) {
+ List<ChunkMetadata> rowMetaDataList = chunkGroupMetadata.getChunkMetadataList();
+ String device = chunkGroupMetadata.getDevice();
for (ChunkMetadata chunkMetaData : rowMetaDataList) {
String measurementId = chunkMetaData.getMeasurementUid();
- if (!metadatas.containsKey(deviceId)) {
- metadatas.put(deviceId, new HashMap<>());
+ if (!metadatasForQuery.containsKey(device)) {
+ metadatasForQuery.put(device, new HashMap<>());
}
- if (!metadatas.get(deviceId).containsKey(measurementId)) {
- metadatas.get(deviceId).put(measurementId, new ArrayList<>());
+ if (!metadatasForQuery.get(device).containsKey(measurementId)) {
+ metadatasForQuery.get(device).put(measurementId, new ArrayList<>());
}
- metadatas.get(deviceId).get(measurementId).add(chunkMetaData);
+ metadatasForQuery.get(device).get(measurementId).add(chunkMetaData);
}
}
}
@@ -216,12 +221,12 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
*
* @return a list of Device ChunkMetadataList Pair
*/
- private List<Pair<String, List<ChunkMetadata>>> getAppendedRowMetadata() {
- List<Pair<String, List<ChunkMetadata>>> append = new ArrayList<>();
- if (lastFlushedChunkGroupIndex < chunkGroupInfoList.size()) {
- append.addAll(chunkGroupInfoList
- .subList(lastFlushedChunkGroupIndex, chunkGroupInfoList.size()));
- lastFlushedChunkGroupIndex = chunkGroupInfoList.size();
+ private List<ChunkGroupMetadata> getAppendedRowMetadata() {
+ List<ChunkGroupMetadata> append = new ArrayList<>();
+ if (lastFlushedChunkGroupIndex < chunkGroupMetadataList.size()) {
+ append.addAll(chunkGroupMetadataList
+ .subList(lastFlushedChunkGroupIndex, chunkGroupMetadataList.size()));
+ lastFlushedChunkGroupIndex = chunkGroupMetadataList.size();
}
return append;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 0339f5b..a8146c8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -18,11 +18,13 @@
*/
package org.apache.iotdb.tsfile.write.writer;
+import java.util.Iterator;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
@@ -39,7 +41,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -50,7 +51,7 @@ import java.util.Map;
import java.util.TreeMap;
/**
- * TSFileIOWriter is used to construct metadata and write data stored in memory to output stream.
+ * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
*/
public class TsFileIOWriter {
@@ -65,16 +66,21 @@ public class TsFileIOWriter {
}
protected TsFileOutput out;
- protected List<Pair<String, List<ChunkMetadata>>> chunkGroupInfoList = new ArrayList<>();
protected boolean canWrite = true;
protected int totalChunkNum = 0;
protected int invalidChunkNum;
protected File file;
- protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
- protected Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+
+ // current flushed Chunk
private ChunkMetadata currentChunkMetadata;
+ // current flushed ChunkGroup
+ protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
+ // all flushed ChunkGroups
+ protected List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+
private long markedPosition;
- private String deviceId;
+ private String currentChunkGroupDeviceId;
private long currentChunkGroupStartOffset;
private List<Pair<Long, Long>> versionInfo = new ArrayList<>();
@@ -127,7 +133,7 @@ public class TsFileIOWriter {
}
public void startChunkGroup(String deviceId) throws IOException {
- this.deviceId = deviceId;
+ this.currentChunkGroupDeviceId = deviceId;
currentChunkGroupStartOffset = out.getPosition();
logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
chunkMetadataList = new ArrayList<>();
@@ -137,16 +143,16 @@ public class TsFileIOWriter {
* end chunk and write some log. If there is no data in the chunk group, nothing will be flushed.
*/
public void endChunkGroup() throws IOException {
- if (deviceId == null || chunkMetadataList.isEmpty()) {
+ if (currentChunkGroupDeviceId == null || chunkMetadataList.isEmpty()) {
return;
}
long dataSize = out.getPosition() - currentChunkGroupStartOffset;
- ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(deviceId, dataSize,
+ ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupDeviceId, dataSize,
chunkMetadataList.size());
chunkGroupFooter.serializeTo(out.wrapAsStream());
- chunkGroupInfoList.add(new Pair<>(deviceId, chunkMetadataList));
+ chunkGroupMetadataList.add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
logger.debug("end chunk group:{}", chunkMetadataList);
- deviceId = null;
+ currentChunkGroupDeviceId = null;
chunkMetadataList = null;
}
@@ -204,8 +210,6 @@ public class TsFileIOWriter {
*/
public void endCurrentChunk() {
chunkMetadataList.add(currentChunkMetadata);
- Path path = new Path(deviceId, currentChunkMetadata.getMeasurementUid());
- chunkMetadataListMap.computeIfAbsent(path, k -> new ArrayList<>()).add(currentChunkMetadata);
currentChunkMetadata = null;
totalChunkNum++;
}
@@ -219,10 +223,17 @@ public class TsFileIOWriter {
// serialize the SEPARATOR of MetaData
ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-
- logger.debug("get time series list:{}", chunkMetadataListMap.keySet());
-
- Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList();
+
+ // group ChunkMetadata by series
+ Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+ for (ChunkGroupMetadata chunkGroupMetadata: chunkGroupMetadataList) {
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ }
+ }
+
+ Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();
tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
@@ -231,7 +242,9 @@ public class TsFileIOWriter {
tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
long footerIndex = out.getPosition();
- logger.debug("start to flush the footer,file pos:{}", footerIndex);
+ if (logger.isDebugEnabled()) {
+ logger.debug("start to flush the footer,file pos:{}", footerIndex);
+ }
// write TsFileMetaData
int size = tsFileMetaData.serializeTo(out.wrapAsStream());
@@ -253,19 +266,18 @@ public class TsFileIOWriter {
// close file
out.close();
- if (resourceLogger.isInfoEnabled() && file != null) {
- resourceLogger.info("{} writer is closed.", file.getName());
+ if (resourceLogger.isDebugEnabled() && file != null) {
+ resourceLogger.debug("{} writer is closed.", file.getName());
}
canWrite = false;
- chunkMetadataListMap = new TreeMap<>();
- logger.info("output stream is closed");
}
/**
* Flush ChunkMetadataList and TimeseriesMetaData
* @return DeviceMetaDataMap in TsFileMetaData
*/
- private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList() throws IOException {
+ private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList(
+ Map<Path, List<ChunkMetadata>> chunkMetadataListMap) throws IOException {
// convert ChunkMetadataList to this field
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new LinkedHashMap<>();
@@ -304,7 +316,7 @@ public class TsFileIOWriter {
size += timeseriesMetaData.serializeTo(out.wrapAsStream());
}
deviceMetadataMap
- .put(device, new Pair<Long, Integer>(offsetOfFirstTimeseriesMetaDataInDevice, size));
+ .put(device, new Pair<>(offsetOfFirstTimeseriesMetaDataInDevice, size));
}
// return
return deviceMetadataMap;
@@ -323,11 +335,10 @@ public class TsFileIOWriter {
// device -> ChunkMetadataList
public Map<String, List<ChunkMetadata>> getDeviceChunkMetadataMap() {
Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>();
- for (Map.Entry<Path, List<ChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
- Path path = entry.getKey();
- String device = path.getDevice();
- deviceChunkMetadataMap.computeIfAbsent(device, k -> new ArrayList<>())
- .addAll(entry.getValue());
+
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ deviceChunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), k -> new ArrayList<>())
+ .addAll(chunkGroupMetadata.getChunkMetadataList());
}
return deviceChunkMetadataMap;
}
@@ -376,22 +387,27 @@ public class TsFileIOWriter {
/**
* Remove such ChunkMetadata that its startTime is not in chunkStartTimes
*/
-
public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
Map<Path, Integer> startTimeIdxes = new HashMap<>();
chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
- for (Map.Entry<Path, List<ChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
- List<ChunkMetadata> chunkMetadatas = entry.getValue();
- Path path = entry.getKey();
- int chunkNum = chunkMetadatas.size();
- for (ChunkMetadata chunkMetaData : chunkMetadatas) {
+ Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator();
+ while (chunkGroupMetaDataIterator.hasNext()) {
+ ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next();
+ String deviceId = chunkGroupMetaData.getDevice();
+ int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
+ Iterator<ChunkMetadata> chunkMetaDataIterator = chunkGroupMetaData.getChunkMetadataList()
+ .iterator();
+ while (chunkMetaDataIterator.hasNext()) {
+ ChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
+ Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
int startTimeIdx = startTimeIdxes.get(path);
+
List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
&& pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
if (!chunkValid) {
- chunkMetadatas.remove(chunkMetaData);
+ chunkMetaDataIterator.remove();
chunkNum--;
invalidChunkNum++;
} else {
@@ -399,7 +415,7 @@ public class TsFileIOWriter {
}
}
if (chunkNum == 0) {
- chunkMetadataListMap.remove(path);
+ chunkGroupMetaDataIterator.remove();
}
}
}