You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/10/22 07:59:30 UTC
[incubator-iotdb] 01/02: new version of sync
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync_V2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 80119d439678cf34af1f629216fcb5f9b07fee50
Author: lta <li...@163.com>
AuthorDate: Tue Oct 22 15:57:15 2019 +0800
new version of sync
---
.../engine/storagegroup/StorageGroupProcessor.java | 162 +++++++++++++--------
1 file changed, 98 insertions(+), 64 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2611b78..9ed03df 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -77,8 +77,8 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -986,6 +986,100 @@ public class StorageGroupProcessor {
}
logger.info("{} a merge task ends", storageGroupName);
}
+//
+// /**
+// * Load a new tsfile to storage group processor
+// *
+// * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+// * or unsequence list.
+// *
+// * Secondly, execute the loading process by the type.
+// *
+// * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+// *
+// * @param newTsFileResource tsfile resource
+// * @UsedBy sync module.
+// */
+// public void loadNewTsFile(TsFileResource newTsFileResource)
+// throws TsFileProcessorException {
+// File tsfileToBeInserted = newTsFileResource.getFile();
+// writeLock();
+// mergeLock.writeLock().lock();
+// try {
+// boolean isOverlap = false;
+// int preIndex = -1, subsequentIndex = sequenceFileList.size();
+//
+// // check new tsfile
+// outer:
+// for (int i = 0; i < sequenceFileList.size(); i++) {
+// if (sequenceFileList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
+// return;
+// }
+// if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+// continue;
+// }
+// boolean hasPre = false, hasSubsequence = false;
+// for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+// if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+// long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
+// long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
+// long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+// long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+// if (startTime1 > endTime2) {
+// hasSubsequence = true;
+// } else if (startTime2 > endTime1) {
+// hasPre = true;
+// } else {
+// isOverlap = true;
+// break outer;
+// }
+// }
+// }
+// if (hasPre && hasSubsequence) {
+// isOverlap = true;
+// break;
+// }
+// if (!hasPre && hasSubsequence) {
+// subsequentIndex = i;
+// break;
+// }
+// if (hasPre) {
+// preIndex = i;
+// }
+// }
+//
+// // loading tsfile by type
+// if (isOverlap) {
+// loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource,
+// unSequenceFileList.size());
+// } else {
+//
+// // check whether the file name needs to be renamed.
+// if (subsequentIndex != sequenceFileList.size() || preIndex == -1) {
+// String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
+// subsequentIndex);
+// if (!newFileName.equals(tsfileToBeInserted.getName())) {
+// logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
+// tsfileToBeInserted.getName(), newFileName);
+// newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+// }
+// }
+// loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+// getBinarySearchIndex(newTsFileResource));
+// }
+//
+// // update latest time map
+// updateLatestTimeMap(newTsFileResource);
+// } catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
+// logger.error("Failed to append the tsfile {} to storage group processor {}.",
+// tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+// IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+// throw new TsFileProcessorException(e);
+// } finally {
+// mergeLock.writeLock().unlock();
+// writeUnlock();
+// }
+// }
/**
* Load a new tsfile to storage group processor
@@ -1006,69 +1100,8 @@ public class StorageGroupProcessor {
writeLock();
mergeLock.writeLock().lock();
try {
- boolean isOverlap = false;
- int preIndex = -1, subsequentIndex = sequenceFileList.size();
-
- // check new tsfile
- outer:
- for (int i = 0; i < sequenceFileList.size(); i++) {
- if (sequenceFileList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
- return;
- }
- if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
- continue;
- }
- boolean hasPre = false, hasSubsequence = false;
- for (String device : newTsFileResource.getStartTimeMap().keySet()) {
- if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
- long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
- long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
- long startTime2 = newTsFileResource.getStartTimeMap().get(device);
- long endTime2 = newTsFileResource.getEndTimeMap().get(device);
- if (startTime1 > endTime2) {
- hasSubsequence = true;
- } else if (startTime2 > endTime1) {
- hasPre = true;
- } else {
- isOverlap = true;
- break outer;
- }
- }
- }
- if (hasPre && hasSubsequence) {
- isOverlap = true;
- break;
- }
- if (!hasPre && hasSubsequence) {
- subsequentIndex = i;
- break;
- }
- if (hasPre) {
- preIndex = i;
- }
- }
-
- // loading tsfile by type
- if (isOverlap) {
- loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource,
- unSequenceFileList.size());
- } else {
-
- // check whether the file name needs to be renamed.
- if (subsequentIndex != sequenceFileList.size() || preIndex == -1) {
- String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
- subsequentIndex);
- if (!newFileName.equals(tsfileToBeInserted.getName())) {
- logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
- tsfileToBeInserted.getName(), newFileName);
- newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
- }
- }
- loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
- getBinarySearchIndex(newTsFileResource));
- }
-
- // update latest time map
+ loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+ getBinarySearchIndex(newTsFileResource));
updateLatestTimeMap(newTsFileResource);
} catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
logger.error("Failed to append the tsfile {} to storage group processor {}.",
@@ -1081,6 +1114,7 @@ public class StorageGroupProcessor {
}
}
+
/**
* Get an appropriate filename to ensure the order between files. The tsfile is named after
* ({systemTime}-{versionNum}-{mergeNum}.tsfile).