You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/11/28 12:54:56 UTC
[incubator-iotdb] branch add_load_tsfile_feature updated: complete
load tsfile and fix some bugs of corner cases
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch add_load_tsfile_feature
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/add_load_tsfile_feature by this push:
new b68c549 complete load tsfile and fix some bugs of corner cases
b68c549 is described below
commit b68c5499adbe1370c33a1196045ac415ec23cbdc
Author: lta <li...@163.com>
AuthorDate: Thu Nov 28 20:54:39 2019 +0800
complete load tsfile and fix some bugs of corner cases
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 30 +++-
.../engine/storagegroup/StorageGroupProcessor.java | 158 ++++++++++++++++++++-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 16 ++-
.../iotdb/db/sync/receiver/load/FileLoader.java | 2 +-
.../{MemEst => memestimation}/MemEstTool.java | 2 +-
.../{MemEst => memestimation}/MemEstToolCmd.java | 2 +-
6 files changed, 195 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1fa1ced..e8c4c34 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -428,6 +428,12 @@ public class StorageEngine implements IService {
}
}
+ public void loadNewTsFileForSync(TsFileResource newTsFileResource)
+ throws TsFileProcessorException, StorageEngineException {
+ getProcessor(newTsFileResource.getFile().getParentFile().getName())
+ .loadNewTsFileForSync(newTsFileResource);
+ }
+
public void loadNewTsFile(TsFileResource newTsFileResource)
throws TsFileProcessorException, StorageEngineException {
getProcessor(newTsFileResource.getFile().getParentFile().getName())
@@ -435,13 +441,29 @@ public class StorageEngine implements IService {
}
public boolean deleteTsfile(File deletedTsfile) throws StorageEngineException {
- return getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+ if (deletedTsfile.getParentFile() != null) {
+ return getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+ }
+ List<String> storageGroupNames = MManager.getInstance().getAllStorageGroupNames();
+ boolean hasDeleted = false;
+ for (String storageGroupName : storageGroupNames) {
+ hasDeleted |= getProcessor(storageGroupName).deleteTsfile(deletedTsfile);
+ }
+ return hasDeleted;
}
- public boolean moveTsfile(File deletedTsfile, File targetDir)
+ public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
throws StorageEngineException, IOException {
- return getProcessor(deletedTsfile.getParentFile().getName())
- .moveTsfile(deletedTsfile, targetDir);
+ if(tsfileToBeMoved.getParentFile() != null){
+ return getProcessor(tsfileToBeMoved.getParentFile().getName())
+ .moveTsfile(tsfileToBeMoved, targetDir);
+ }
+ List<String> storageGroupNames = MManager.getInstance().getAllStorageGroupNames();
+ boolean hasMoved = false;
+ for (String storageGroupName : storageGroupNames) {
+ hasMoved |= getProcessor(storageGroupName).moveTsfile(tsfileToBeMoved, targetDir);
+ }
+ return hasMoved;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a21e6d9..d47645f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1144,7 +1144,8 @@ public class StorageGroupProcessor {
}
/**
- * Load a new tsfile to storage group processor
+ * Load a new tsfile to storage group processor. The mechanism of the sync module will make sure that
+ * there has no file which is overlapping with the new file.
*
* Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
* or unsequence list.
@@ -1156,7 +1157,7 @@ public class StorageGroupProcessor {
* @param newTsFileResource tsfile resource
* @UsedBy sync module.
*/
- public void loadNewTsFile(TsFileResource newTsFileResource)
+ public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws TsFileProcessorException {
File tsfileToBeInserted = newTsFileResource.getFile();
writeLock();
@@ -1177,6 +1178,155 @@ public class StorageGroupProcessor {
}
/**
+ * Load a new tsfile to storage group processor. Tne file may have overlap with other files.
+ *
+ * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+ * or unsequence list.
+ *
+ * Secondly, execute the loading process by the type.
+ *
+ * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+ *
+ * @param newTsFileResource tsfile resource
+ * @UsedBy load external tsfile module
+ */
+ public void loadNewTsFile(TsFileResource newTsFileResource)
+ throws TsFileProcessorException {
+ File tsfileToBeInserted = newTsFileResource.getFile();
+ writeLock();
+ mergeLock.writeLock().lock();
+ try {
+ boolean isOverlap = false;
+ int preIndex = -1, subsequentIndex = sequenceFileList.size();
+
+ // check new tsfile
+ outer:
+ for (int i = 0; i < sequenceFileList.size(); i++) {
+ if (sequenceFileList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
+ return;
+ }
+ if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+ continue;
+ }
+ boolean hasPre = false, hasSubsequence = false;
+ for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+ if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+ long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
+ long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
+ long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+ long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+ if (startTime1 > endTime2) {
+ hasSubsequence = true;
+ } else if (startTime2 > endTime1) {
+ hasPre = true;
+ } else {
+ isOverlap = true;
+ break outer;
+ }
+ }
+ }
+ if (hasPre && hasSubsequence) {
+ isOverlap = true;
+ break;
+ }
+ if (!hasPre && hasSubsequence) {
+ subsequentIndex = i;
+ break;
+ }
+ if (hasPre) {
+ preIndex = i;
+ }
+ }
+
+ // loading tsfile by type
+ if (isOverlap) {
+ loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource,
+ unSequenceFileList.size());
+ } else {
+
+ // check whether the file name needs to be renamed.
+ if (subsequentIndex != sequenceFileList.size() || preIndex == -1) {
+ String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
+ subsequentIndex);
+ if (!newFileName.equals(tsfileToBeInserted.getName())) {
+ logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
+ tsfileToBeInserted.getName(), newFileName);
+ newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+ }
+ }
+ loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+ getBinarySearchIndex(newTsFileResource));
+ }
+
+ // update latest time map
+ updateLatestTimeMap(newTsFileResource);
+ } catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
+ logger.error("Failed to append the tsfile {} to storage group processor {}.",
+ tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+ throw new TsFileProcessorException(e);
+ } finally {
+ mergeLock.writeLock().unlock();
+ writeUnlock();
+ }
+ }
+
+ /**
+ * Get an appropriate filename to ensure the order between files. The tsfile is named after
+ * ({systemTime}-{versionNum}-{mergeNum}.tsfile).
+ *
+ * The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the list
+ * based on the file name and ensure the correctness of the order, so there are three cases.
+ *
+ * 1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
+ * name is less than the timestamp in the file name of the first tsfile in the list, then the
+ * file name is legal and the file name is returned directly. Otherwise, its timestamp can be set
+ * to half of the timestamp value in the file name of the first tsfile in the list , and the
+ * version number is the version number in the file name of the first tsfile in the list.
+ *
+ * 2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
+ * name is lager than the timestamp in the file name of the last tsfile in the list, then the
+ * file name is legal and the file name is returned directly. Otherwise, the file name is
+ * generated by the system according to the naming rules and returned.
+ *
+ * 3. This file is inserted between two files. If the timestamp in the name of the file satisfies
+ * the timestamp between the timestamps in the name of the two files, then it is a legal name and
+ * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
+ * version number is the version number in the tsfile with a larger timestamp.
+ *
+ * @param tsfileName origin tsfile name
+ * @return appropriate filename
+ */
+ private String getFileNameForLoadingFile(String tsfileName, int preIndex, int subsequentIndex) {
+ long currentTsFileTime = Long
+ .parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ long preTime;
+ if (preIndex == -1) {
+ preTime = 0L;
+ } else {
+ String preName = sequenceFileList.get(preIndex).getFile().getName();
+ preTime = Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ }
+ if (subsequentIndex == sequenceFileList.size()) {
+ return preTime < currentTsFileTime ? tsfileName
+ : System.currentTimeMillis() + IoTDBConstant.TSFILE_NAME_SEPARATOR + versionController
+ .nextVersion() + IoTDBConstant.TSFILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+ } else {
+ String subsequenceName = sequenceFileList.get(subsequentIndex).getFile().getName();
+ long subsequenceTime = Long
+ .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ long subsequenceVersion = Long
+ .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
+ if (preTime < currentTsFileTime && currentTsFileTime < subsequenceTime) {
+ return tsfileName;
+ } else {
+ return (preTime + ((subsequenceTime - preTime) >> 1)) + IoTDBConstant.TSFILE_NAME_SEPARATOR
+ + subsequenceVersion + IoTDBConstant.TSFILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+ }
+ }
+ }
+
+ /**
* Get binary search index in @code{sequenceFileList}
*
* @return right index to insert
@@ -1205,7 +1355,7 @@ public class StorageGroupProcessor {
/**
* Update latest time in latestTimeForEachDevice and latestFlushedTimeForEachDevice.
*
- * @UsedBy sync module
+ * @UsedBy sync module, load external tsfile module.
*/
private void updateLatestTimeMap(TsFileResource newTsFileResource) {
for (Entry<String, Long> entry : newTsFileResource.getEndTimeMap().entrySet()) {
@@ -1228,7 +1378,7 @@ public class StorageGroupProcessor {
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param index the index in sequenceFileList/unSequenceFileList
- * @UsedBy sync module
+ * @UsedBy sync module, load external tsfile module.
*/
private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource, int index)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 81e6f5a..146e27f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -71,6 +72,7 @@ import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.cache.CacheException;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -187,8 +189,14 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
TsFileResource tsFileResource = new TsFileResource(file);
try {
FileLoaderUtils.checkTsFileResource(tsFileResource);
- //TODO check version and metadata
- } catch (IOException e) {
+ if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file's version is old which needs to be upgraded.",
+ file.getAbsolutePath()));
+ }
+ StorageEngine.getInstance().loadNewTsFile(tsFileResource);
+ } catch (IOException | TsFileProcessorException | StorageEngineException e) {
throw new QueryProcessException(
String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
}
@@ -198,7 +206,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
try {
if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
throw new QueryProcessException(
- String.format("File %s doesn't exists.", plan.getFile().getName()));
+ String.format("File %s doesn't exist.", plan.getFile().getName()));
}
} catch (StorageEngineException e) {
throw new QueryProcessException(
@@ -214,7 +222,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
try {
if (!StorageEngine.getInstance().moveTsfile(plan.getFile(), plan.getTargetDir())) {
throw new QueryProcessException(
- String.format("File %s doesn't exists.", plan.getFile().getName()));
+ String.format("File %s doesn't exist.", plan.getFile().getName()));
}
} catch (StorageEngineException | IOException e) {
throw new QueryProcessException(
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index be2cb61..f6551e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -136,7 +136,7 @@ public class FileLoader implements IFileLoader {
FileLoaderUtils.checkTsFileResource(tsFileResource);
try {
FileLoaderManager.getInstance().checkAndUpdateDeviceOwner(tsFileResource);
- StorageEngine.getInstance().loadNewTsFile(tsFileResource);
+ StorageEngine.getInstance().loadNewTsFileForSync(tsFileResource);
} catch (SyncDeviceOwnerConflictException e) {
LOGGER.error("Device owner has conflicts, so skip the loading file", e);
} catch (TsFileProcessorException | StorageEngineException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstTool.java b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstTool.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstTool.java
rename to server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstTool.java
index c623c80..8e9e222 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstTool.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.tools.MemEst;
+package org.apache.iotdb.db.tools.memestimation;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
rename to server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java
index 0d6c5f4..2df8e9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.tools.MemEst;
+package org.apache.iotdb.db.tools.memestimation;
import io.airlift.airline.Command;
import io.airlift.airline.Option;