You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/29 02:47:41 UTC
[incubator-iotdb] branch merge updated: change back recover and
query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/merge by this push:
new e55e752 change back recover and query
e55e752 is described below
commit e55e75254ae0f890919052152030bfe6b2c96b52
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Wed Jul 29 10:48:00 2020 +0800
change back recover and query
---
.../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++--------
.../writelog/recover/TsFileRecoverPerformer.java | 50 ++++++++--------------
2 files changed, 34 insertions(+), 54 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 3de4202..2fa9008 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -551,12 +551,9 @@ public class StorageGroupProcessor {
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
- List<List<TsFileResource>> defaultVmTsFileResources = new ArrayList<>();
- defaultVmTsFileResources.add(new ArrayList<>());
-
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
storageGroupName + FILE_NAME_SEPARATOR,
- getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
+ getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, isSeq,
i == tsFiles.size() - 1);
RestorableTsFileIOWriter writer;
@@ -580,15 +577,14 @@ public class StorageGroupProcessor {
this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
true, writer);
workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
- tsFileResource.setProcessor(tsFileProcessor);
} else {
tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack, this::unsequenceFlushCallback, false,
writer);
workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
- tsFileResource.setProcessor(tsFileProcessor);
}
+ tsFileResource.setProcessor(tsFileProcessor);
tsFileResource.removeResourceFile();
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
@@ -744,11 +740,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -898,10 +894,10 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param fileList file list to add new processor
- * @param sequence whether is sequence or not
+ * @param fileList file list to add new processor
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1347,10 +1343,10 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
+ * @param startTime the startTime of delete range.
+ * @param endTime the endTime of delete range.
*/
public void delete(String deviceId, String measurementId, long startTime, long endTime)
throws IOException {
@@ -2107,9 +2103,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2173,8 +2169,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index f9ceaa7..be0ebb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -23,15 +23,12 @@ import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SU
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -46,7 +43,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,12 +55,12 @@ public class TsFileRecoverPerformer {
private static final Logger logger = LoggerFactory.getLogger(TsFileRecoverPerformer.class);
- private String filePath;
- private String logNodePrefix;
- private VersionController versionController;
- private TsFileResource tsFileResource;
- private boolean sequence;
- private boolean isLastFile;
+ private final String filePath;
+ private final String logNodePrefix;
+ private final VersionController versionController;
+ private final TsFileResource tsFileResource;
+ private final boolean sequence;
+ private final boolean isLastFile;
/**
* @param isLastFile whether this TsFile is the last file of its partition
@@ -110,29 +106,22 @@ public class TsFileRecoverPerformer {
// judge whether tsfile is complete
if (!restorableTsFileIOWriter.hasCrashed()) {
- // if tsfile is complete, then there is no vmfiles.
try {
- recoverResource(tsFileResource);
+ recoverResource();
return restorableTsFileIOWriter;
} catch (IOException e) {
throw new StorageGroupProcessorException(
- "recover the resource file failed: " + filePath
- + RESOURCE_SUFFIX + e);
+ "recover the resource file failed: " + filePath + RESOURCE_SUFFIX + e);
}
}
// tsfile has crashed
// due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
// map must be updated first to avoid duplicated insertion
- recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
+ recoverResourceFromWriter(restorableTsFileIOWriter);
- // If the vm is not enable, the walTargetWriter points to the tsfile.
- // If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log
- // if the vm is enable and flush log does not exist, the walTargetWriter is null.
- RestorableTsFileIOWriter walTargetWriter = restorableTsFileIOWriter;
- TsFileResource walTargetResource = tsFileResource;
// redo logs
- redoLogs(walTargetWriter, walTargetResource, restorableTsFileIOWriter);
+ redoLogs(restorableTsFileIOWriter);
// clean logs
try {
@@ -144,10 +133,10 @@ public class TsFileRecoverPerformer {
return restorableTsFileIOWriter;
}
- private void recoverResource(TsFileResource tsFileResource) throws IOException {
+ private void recoverResource() throws IOException {
if (tsFileResource.resourceFileExists()) {
// .resource file exists, deserialize it
- recoverResourceFromFile(tsFileResource);
+ recoverResourceFromFile();
} else {
// .resource file does not exist, read file metadata and recover tsfile resource
try (TsFileSequenceReader reader = new TsFileSequenceReader(
@@ -163,18 +152,18 @@ public class TsFileRecoverPerformer {
}
}
- private void recoverResourceFromFile(TsFileResource tsFileResource) throws IOException {
+ private void recoverResourceFromFile() throws IOException {
try {
tsFileResource.deserialize();
} catch (IOException e) {
logger.warn("Cannot deserialize TsFileResource {}, construct it using "
+ "TsFileSequenceReader", tsFileResource.getTsFile(), e);
- recoverResourceFromReader(tsFileResource);
+ recoverResourceFromReader();
}
}
- private void recoverResourceFromReader(TsFileResource tsFileResource) throws IOException {
+ private void recoverResourceFromReader() throws IOException {
try (TsFileSequenceReader reader =
new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath(), true)) {
for (Entry<String, List<TimeseriesMetadata>> entry : reader.getAllTimeseriesMetadata()
@@ -192,8 +181,7 @@ public class TsFileRecoverPerformer {
}
- private void recoverResourceFromWriter(RestorableTsFileIOWriter restorableTsFileIOWriter,
- TsFileResource tsFileResource) {
+ private void recoverResourceFromWriter(RestorableTsFileIOWriter restorableTsFileIOWriter) {
Map<String, List<ChunkMetadata>> deviceChunkMetaDataMap =
restorableTsFileIOWriter.getDeviceChunkMetadataMap();
for (Map.Entry<String, List<ChunkMetadata>> entry : deviceChunkMetaDataMap.entrySet()) {
@@ -209,15 +197,13 @@ public class TsFileRecoverPerformer {
tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
}
- private boolean redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter,
- TsFileResource tsFileResource, RestorableTsFileIOWriter tsFileIOWriter)
+ private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
throws StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
recoverMemTable.setVersion(versionController.nextVersion());
LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(),
versionController, tsFileResource, recoverMemTable, sequence);
logReplayer.replayLogs();
- boolean res = false;
try {
if (!recoverMemTable.isEmpty()) {
// flush logs
@@ -225,7 +211,6 @@ public class TsFileRecoverPerformer {
restorableTsFileIOWriter,
tsFileResource.getTsFile().getParentFile().getParentFile().getName());
tableFlushTask.syncFlushMemTable();
- res = true;
}
if (!isLastFile || tsFileResource.isCloseFlagSet()) {
@@ -236,7 +221,6 @@ public class TsFileRecoverPerformer {
}
// otherwise this file is not closed before crush, do nothing so we can continue writing
// into it
- return res;
} catch (IOException | InterruptedException | ExecutionException e) {
throw new StorageGroupProcessorException(e);
}