You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/12/26 03:45:32 UTC
[incubator-iotdb] 01/01: add load external files feature
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch rel/0.9-load
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2cfa2101f8e0b888985a43ad50a29a1c0d567b07
Author: lta <li...@163.com>
AuthorDate: Thu Dec 26 11:45:04 2019 +0800
add load external files feature
---
.../6-System Tools/8-Load External Tsfile.md | 46 +++++
.../6-System Tools/8-Load External Tsfile.md | 37 ++++
.../org/apache/iotdb/db/sql/parse/TqlLexer.g | 13 ++
.../org/apache/iotdb/db/sql/parse/TqlParser.g | 22 +++
.../org/apache/iotdb/db/engine/StorageEngine.java | 15 +-
.../engine/storagegroup/StorageGroupProcessor.java | 155 +++++++++++++++-
.../org/apache/iotdb/db/metadata/MManager.java | 23 ++-
.../org/apache/iotdb/db/qp/QueryProcessor.java | 1 +
.../apache/iotdb/db/qp/constant/SQLConstant.java | 6 +
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 196 +++++++++++++++++----
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 +-
.../iotdb/db/qp/logical/sys/LoadFilesOperator.java | 67 +++++++
.../iotdb/db/qp/physical/sys/OperateFilePlan.java | 82 +++++++++
.../iotdb/db/qp/strategy/LogicalGenerator.java | 22 +++
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 11 +-
.../iotdb/db/sync/receiver/load/FileLoader.java | 32 +---
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 62 +++++++
.../integration/IoTDBLoadExternalTsfileTest.java | 89 ++++++++++
.../iotdb/db/metadata/MManagerImproveTest.java | 2 +-
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 31 ++++
20 files changed, 837 insertions(+), 77 deletions(-)
diff --git a/docs/Documentation-CHN/UserGuide/6-System Tools/8-Load External Tsfile.md b/docs/Documentation-CHN/UserGuide/6-System Tools/8-Load External Tsfile.md
new file mode 100644
index 0000000..57d58f0
--- /dev/null
+++ b/docs/Documentation-CHN/UserGuide/6-System Tools/8-Load External Tsfile.md
@@ -0,0 +1,46 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# 第6章: 系统工具
+# 加载外部tsfile文件
+<!-- TOC -->
+
+- [第6章: 系统工具](#第6章-系统工具)
+- [加载外部tsfile文件](#加载外部tsfile文件)
+- [介绍](#介绍)
+- [使用方式](#使用方式)
+ - [加载tsfile文件](#加载tsfile文件)
+ - [删除tsfile文件](#删除tsfile文件)
+ - [移出tsfile文件至指定目录](#移出tsfile文件至指定目录)
+
+<!-- /TOC -->
+# 介绍
+加载外部tsfile文件工具允许用户向正在运行中的Apache IoTDB中加载、删除或移出tsfile文件。
+
+# 使用方式
+用户通过Cli工具或JDBC向Apache IoTDB系统发送指定命令实现文件加载的功能。
+## 加载tsfile文件
+加载tsfile文件的指令为:`load <path/dir>`
+
+该指令有两种用法:
+1. 通过指定文件路径(绝对路径)加载单tsfile文件,该文件名称需要符合tsfile的命名规范,即`{systemTime}-{versionNum}-{mergeNum}.tsfile`。若该文件对应的`.resource`文件存在,会被一并加载至Apache IoTDB数据文件的目录和引擎中,否则将通过tsfile文件重新生成对应的`.resource`文件,即加载的tsfile文件所对应的`.resource`文件不是必要的。
+
+2. 通过指定文件夹路径(绝对路径)批量加载文件,待加载的tsfile文件名称需要符合tsfile的命名规范,即`{systemTime}-{versionNum}-{mergeNum}.tsfile`。若待加载文件对应的`.resource`文件存在,则会一并加载至Apache IoTDB数据文件目录和引擎中,否则将通过tsfile文件重新生成对应的`.resource`文件,即加载的tsfile文件所对应的`.resource`文件不是必要的。
\ No newline at end of file
diff --git a/docs/Documentation/UserGuide/6-System Tools/8-Load External Tsfile.md b/docs/Documentation/UserGuide/6-System Tools/8-Load External Tsfile.md
new file mode 100644
index 0000000..24eb561
--- /dev/null
+++ b/docs/Documentation/UserGuide/6-System Tools/8-Load External Tsfile.md
@@ -0,0 +1,37 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# Chapter 6: System Tools
+# Load External Tsfile Tool
+
+# Introduction
+The load external tsfile tool allows users to load tsfiles, delete a tsfile, or move a tsfile to target directory from the running Apache IoTDB instance.
+
+# Usage
+The user sends specified commands to the Apache IoTDB system through the Cli tool or JDBC to use the tool.
+
+## load tsfiles
+The command to load tsfiles is `load <path/dir>`.
+
+This command has two usages:
+1. Load a single tsfile by specifying a file path (absolute path). The name of the tsfile needs to conform to the tsfile naming convention, that is, `{systemTime}-{versionNum}-{mergeNum} .tsfile`. If the `.resource` file corresponding to the file exists, it will be loaded into the data directory and engine of the Apache IoTDB. Otherwise, the corresponding `.resource` file will be regenerated from the tsfile file.
+
+2. Load a batch of files by specifying a folder path (absolute path). The name of the tsfiles need to conform to the tsfile naming convention, that is, `{systemTime}-{versionNum}-{mergeNum} .tsfile`. If the `.resource` file corresponding to the file exists, they will be loaded into the data directory and engine of the Apache IoTDB. Otherwise, the corresponding` .resource` files will be regenerated from the tsfile sfile.
\ No newline at end of file
diff --git a/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlLexer.g b/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlLexer.g
index c49fb6f..0217ec5 100644
--- a/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlLexer.g
+++ b/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlLexer.g
@@ -416,6 +416,11 @@ DATETIME
(('+' | '-') INT ':' INT)?
;
+BOOLEAN_VALUE
+ : T R U E
+ | F A L S E
+ ;
+
EXPONENT : INT ('e'|'E') ('+'|'-')? INT ;
@@ -551,3 +556,11 @@ DURATION
:
(NUM+ (Y|M O|W|D|H|M|S|M S|U S|N S))+
;
+
+FILE
+ : (('a'..'z'| 'A'..'Z')(':')?)* (('\\' | '/')+ PATH_FRAGMENT) +
+ ;
+
+fragment PATH_FRAGMENT
+ : ('a'..'z'|'A'..'Z'|'0'..'9'|'_'|'-'|'.')*
+ ;
\ No newline at end of file
diff --git a/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlParser.g b/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlParser.g
index 734adc2..5944c0b 100644
--- a/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlParser.g
+++ b/server/src/main/antlr3/org/apache/iotdb/db/sql/parse/TqlParser.g
@@ -107,6 +107,7 @@ tokens{
TOK_SHOW;
TOK_DATE_EXPR;
TOK_DURATION;
+ TOK_LOAD_FILES;
}
@header{
@@ -301,6 +302,7 @@ ddlStatement
| mergeStatement
| listStatement
| ttlStatement
+ | operateFileStatement
;
administrationStatement
@@ -755,6 +757,26 @@ rootOrId
;
/*
+**** ****
+************* *************
+TTL Operate file
+************* *************
+**** ****
+*/
+operateFileStatement
+ : loadFiles
+ ;
+loadFiles
+ : K_LOAD path=FILE autoCreateSchema?
+ -> ^(TOK_LOAD_FILES $path)
+ ;
+
+autoCreateSchema
+ :
+ | BOOLEAN_VALUE
+ | BOOLEAN_VALUE INT
+ ;
+/*
****
*************
TTL
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 392fdc1..7176bad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -427,10 +427,21 @@ public class StorageEngine implements IService {
}
}
- public void loadNewTsFile(TsFileResource newTsFileResource)
+ public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws TsFileProcessorException, StorageEngineException {
getProcessor(newTsFileResource.getFile().getParentFile().getName())
- .loadNewTsFile(newTsFileResource);
+ .loadNewTsFileForSync(newTsFileResource);
+ }
+
+ public void loadNewTsFile(TsFileResource newTsFileResource)
+ throws TsFileProcessorException, StorageEngineException, StorageGroupException {
+ Map<String, Long> startTimeMap = newTsFileResource.getStartTimeMap();
+ if (startTimeMap == null || startTimeMap.isEmpty()) {
+ throw new StorageEngineException("Can not get the corresponding storage group.");
+ }
+ String device = startTimeMap.keySet().iterator().next();
+ String storageGroupName = MManager.getInstance().getStorageGroupNameByPath(device);
+ getProcessor(storageGroupName).loadNewTsFile(newTsFileResource);
}
public void deleteTsfile(File deletedTsfile) throws StorageEngineException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b8a342d..c62cb73 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1155,7 +1155,7 @@ public class StorageGroupProcessor {
* @param newTsFileResource tsfile resource
* @UsedBy sync module.
*/
- public void loadNewTsFile(TsFileResource newTsFileResource)
+ public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws TsFileProcessorException {
File tsfileToBeInserted = newTsFileResource.getFile();
writeLock();
@@ -1176,6 +1176,155 @@ public class StorageGroupProcessor {
}
/**
+ * Load a new tsfile to storage group processor. Tne file may have overlap with other files.
+ *
+ * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+ * or unsequence list.
+ *
+ * Secondly, execute the loading process by the type.
+ *
+ * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+ *
+ * @param newTsFileResource tsfile resource
+ * @UsedBy load external tsfile module
+ */
+ public void loadNewTsFile(TsFileResource newTsFileResource)
+ throws TsFileProcessorException {
+ File tsfileToBeInserted = newTsFileResource.getFile();
+ writeLock();
+ mergeLock.writeLock().lock();
+ try {
+ boolean isOverlap = false;
+ int preIndex = -1, subsequentIndex = sequenceFileList.size();
+
+ // check new tsfile
+ outer:
+ for (int i = 0; i < sequenceFileList.size(); i++) {
+ if (sequenceFileList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
+ return;
+ }
+ if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+ continue;
+ }
+ boolean hasPre = false, hasSubsequence = false;
+ for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+ if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+ long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
+ long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
+ long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+ long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+ if (startTime1 > endTime2) {
+ hasSubsequence = true;
+ } else if (startTime2 > endTime1) {
+ hasPre = true;
+ } else {
+ isOverlap = true;
+ break outer;
+ }
+ }
+ }
+ if (hasPre && hasSubsequence) {
+ isOverlap = true;
+ break;
+ }
+ if (!hasPre && hasSubsequence) {
+ subsequentIndex = i;
+ break;
+ }
+ if (hasPre) {
+ preIndex = i;
+ }
+ }
+
+ // loading tsfile by type
+ if (isOverlap) {
+ loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource,
+ unSequenceFileList.size());
+ } else {
+
+ // check whether the file name needs to be renamed.
+ if (subsequentIndex != sequenceFileList.size() || preIndex == -1) {
+ String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
+ subsequentIndex);
+ if (!newFileName.equals(tsfileToBeInserted.getName())) {
+ logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
+ tsfileToBeInserted.getName(), newFileName);
+ newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+ }
+ }
+ loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+ getBinarySearchIndex(newTsFileResource));
+ }
+
+ // update latest time map
+ updateLatestTimeMap(newTsFileResource);
+ } catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
+ logger.error("Failed to append the tsfile {} to storage group processor {}.",
+ tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+ throw new TsFileProcessorException(e);
+ } finally {
+ mergeLock.writeLock().unlock();
+ writeUnlock();
+ }
+ }
+
+ /**
+ * Get an appropriate filename to ensure the order between files. The tsfile is named after
+ * ({systemTime}-{versionNum}-{mergeNum}.tsfile).
+ *
+ * The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the list
+ * based on the file name and ensure the correctness of the order, so there are three cases.
+ *
+ * 1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
+ * name is less than the timestamp in the file name of the first tsfile in the list, then the
+ * file name is legal and the file name is returned directly. Otherwise, its timestamp can be set
+ * to half of the timestamp value in the file name of the first tsfile in the list , and the
+ * version number is the version number in the file name of the first tsfile in the list.
+ *
+ * 2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
+ * name is lager than the timestamp in the file name of the last tsfile in the list, then the
+ * file name is legal and the file name is returned directly. Otherwise, the file name is
+ * generated by the system according to the naming rules and returned.
+ *
+ * 3. This file is inserted between two files. If the timestamp in the name of the file satisfies
+ * the timestamp between the timestamps in the name of the two files, then it is a legal name and
+ * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
+ * version number is the version number in the tsfile with a larger timestamp.
+ *
+ * @param tsfileName origin tsfile name
+ * @return appropriate filename
+ */
+ private String getFileNameForLoadingFile(String tsfileName, int preIndex, int subsequentIndex) {
+ long currentTsFileTime = Long
+ .parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ long preTime;
+ if (preIndex == -1) {
+ preTime = 0L;
+ } else {
+ String preName = sequenceFileList.get(preIndex).getFile().getName();
+ preTime = Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ }
+ if (subsequentIndex == sequenceFileList.size()) {
+ return preTime < currentTsFileTime ? tsfileName
+ : System.currentTimeMillis() + IoTDBConstant.TSFILE_NAME_SEPARATOR + versionController
+ .nextVersion() + IoTDBConstant.TSFILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+ } else {
+ String subsequenceName = sequenceFileList.get(subsequentIndex).getFile().getName();
+ long subsequenceTime = Long
+ .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ long subsequenceVersion = Long
+ .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
+ if (preTime < currentTsFileTime && currentTsFileTime < subsequenceTime) {
+ return tsfileName;
+ } else {
+ return (preTime + ((subsequenceTime - preTime) >> 1)) + IoTDBConstant.TSFILE_NAME_SEPARATOR
+ + subsequenceVersion + IoTDBConstant.TSFILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+ }
+ }
+ }
+
+ /**
* Get binary search index in @code{sequenceFileList}
*
* @return right index to insert
@@ -1237,7 +1386,7 @@ public class StorageGroupProcessor {
case LOAD_UNSEQUENCE:
targetFile =
new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- syncedTsFile.getParentFile().getName() + File.separatorChar + syncedTsFile
+ storageGroupName + File.separatorChar + syncedTsFile
.getName());
tsFileResource.setFile(targetFile);
unSequenceFileList.add(index, tsFileResource);
@@ -1249,7 +1398,7 @@ public class StorageGroupProcessor {
case LOAD_SEQUENCE:
targetFile =
new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- syncedTsFile.getParentFile().getName() + File.separatorChar + syncedTsFile
+ storageGroupName + File.separatorChar + syncedTsFile
.getName());
tsFileResource.setFile(targetFile);
sequenceFileList.add(index, tsFileResource);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index cc9922a..057099c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1159,18 +1159,26 @@ public class MManager {
}
/**
- * function for getting node by deviceId from cache.
+ * function for getting node by path from cache.
*/
- public MNode getNodeByDeviceIdFromCache(String deviceId) throws CacheException, PathException {
- lock.readLock().lock();
+ public MNode getNodeByPathFromCache(String path) throws CacheException, PathException {
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ return getNodeByPathFromCache(path, conf.isAutoCreateSchemaEnabled(),
+ conf.getDefaultStorageGroupLevel());
+ }
+
+ /**
+ * function for getting node by path from cache.
+ */
+ public MNode getNodeByPathFromCache(String path, boolean autoCreateSchema, int sgLevel) throws CacheException, PathException {
+ lock.readLock().lock();
MNode node = null;
boolean createSchema = false;
boolean setStorageGroup = false;
try {
- node = mNodeCache.get(deviceId);
+ node = mNodeCache.get(path);
} catch (CacheException e) {
- if (!conf.isAutoCreateSchemaEnabled()) {
+ if (!autoCreateSchema) {
throw e;
} else {
createSchema = true;
@@ -1181,14 +1189,13 @@ public class MManager {
if (createSchema) {
if (setStorageGroup) {
try {
- String storageGroupName = getStorageGroupNameByAutoLevel(
- deviceId, conf.getDefaultStorageGroupLevel());
+ String storageGroupName = getStorageGroupNameByAutoLevel(path, sgLevel);
setStorageGroupToMTree(storageGroupName);
} catch (MetadataException | PathException e1) {
throw new CacheException(e1);
}
}
- node = addDeviceIdToMTree(deviceId);
+ node = addDeviceIdToMTree(path);
}
}
return node;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/server/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
index ed33976..c0b5ba0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
@@ -128,6 +128,7 @@ public class QueryProcessor {
case GRANT_WATERMARK_EMBEDDING:
case REVOKE_WATERMARK_EMBEDDING:
case TTL:
+ case LOAD_FILES:
return operator;
case QUERY:
case UPDATE:
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 8479e7a..5a4a19a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -103,6 +103,10 @@ public class SQLConstant {
public static final int TOK_UNSET = 64;
public static final int TOK_SHOW = 65;
+ public static final int TOK_LOAD_FILES = 69;
+ public static final int TOK_REMOVE_FILE = 70;
+ public static final int TOK_MOVE_FILE = 71;
+
public static final Map<Integer, String> tokenSymbol = new HashMap<>();
public static final Map<Integer, String> tokenNames = new HashMap<>();
public static final Map<Integer, Integer> reverseWords = new HashMap<>();
@@ -161,6 +165,8 @@ public class SQLConstant {
tokenNames.put(TOK_SET, "TOK_SET");
tokenNames.put(TOK_UNSET, "TOK_UNSET");
tokenNames.put(TOK_SHOW, "TOK_SHOW");
+
+ tokenNames.put(TOK_LOAD_FILES, "TOK_LOAD_FILES");
}
static {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 94669c1..814b4ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -21,12 +21,17 @@ package org.apache.iotdb.db.qp.executor;
import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE;
import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE;
import static org.apache.iotdb.db.conf.IoTDBConstant.USER;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -38,8 +43,10 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
@@ -59,6 +66,7 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
@@ -66,14 +74,19 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.cache.CacheException;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -81,6 +94,8 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
@@ -137,11 +152,99 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
case TTL:
operateTTL((SetTTLPlan) plan);
return true;
+ case LOAD_FILES:
+ operateLoadFiles((OperateFilePlan) plan);
+ return true;
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorType()));
}
}
+ private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
+ File file = plan.getFile();
+ if (!file.exists()) {
+ throw new QueryProcessException(
+ String.format("File path %s doesn't exists.", file.getPath()));
+ }
+ if (file.isDirectory()) {
+ recursionFileDir(file, plan);
+ } else {
+ loadFile(file, plan);
+ }
+ }
+
+ private void recursionFileDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+ File[] files = curFile.listFiles();
+ for (File file : files) {
+ if (file.isDirectory()) {
+ recursionFileDir(file, plan);
+ } else {
+ loadFile(file, plan);
+ }
+ }
+ }
+
+ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessException {
+ if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+ return;
+ }
+ TsFileResource tsFileResource = new TsFileResource(file);
+ try {
+ // check file
+ RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+ if (restorableTsFileIOWriter.hasCrashed()){
+ restorableTsFileIOWriter.close();
+ throw new QueryProcessException(
+ String.format("Cannot load file %s because the file has crashed.", file.getAbsolutePath()));
+ }
+ Map<String, MeasurementSchema> schemaMap = new HashMap<>();
+ List<ChunkGroupMetaData> chunkGroupMetaData = new ArrayList<>();
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
+ reader.selfCheck(schemaMap, chunkGroupMetaData, false);
+ }
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
+ if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file's version is old which needs to be upgraded.",
+ file.getAbsolutePath()));
+ }
+
+ //create schemas if they doesn't exist
+ if(plan.isAutoCreateSchema()) {
+ createSchemaAutomatically(chunkGroupMetaData, schemaMap, plan.getSgLevel());
+ }
+
+ StorageEngine.getInstance().loadNewTsFile(tsFileResource);
+ } catch (Exception e) {
+ throw new QueryProcessException(
+ String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
+ }
+ }
+
+ private void createSchemaAutomatically(List<ChunkGroupMetaData> chunkGroupMetaDatas,
+ Map<String, MeasurementSchema> knownSchemas, int sgLevel)
+ throws CacheException, QueryProcessException, MetadataException, StorageEngineException {
+ if (chunkGroupMetaDatas.isEmpty()) {
+ return;
+ }
+ for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDatas) {
+ String device = chunkGroupMetaData.getDeviceID();
+ MNode node = mManager.getNodeByPathFromCache(device, true, sgLevel);
+ for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+ String fullPath =
+ device + IoTDBConstant.PATH_SEPARATOR + chunkMetaData.getMeasurementUid();
+ MeasurementSchema schema = knownSchemas.get(chunkMetaData.getMeasurementUid());
+ if (schema == null) {
+ throw new MetadataException(String
+ .format("Can not get schema if measurement [%s]",
+ chunkMetaData.getMeasurementUid()));
+ }
+ checkPathExists(node, fullPath, schema, true);
+ }
+ }
+ }
+
private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
try {
@@ -221,34 +324,14 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
try {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
- MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
+ MNode node = mManager.getNodeByPathFromCache(deviceId);
String[] strValues = insertPlan.getValues();
TSDataType[] dataTypes = new TSDataType[measurementList.length];
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
for (int i = 0; i < measurementList.length; i++) {
- // check if timeseries exists
- if (!node.hasChild(measurementList[i])) {
- if (!conf.isAutoCreateSchemaEnabled()) {
- throw new QueryProcessException(
- String.format("Current deviceId[%s] does not contain measurement:%s",
- deviceId, measurementList[i]));
- }
- try {
- addPathToMTree(deviceId, measurementList[i], strValues[i]);
- } catch (MetadataException e) {
- if (!e.getMessage().contains("already exist")) {
- throw e;
- }
- }
- }
- MNode measurementNode = node.getChild(measurementList[i]);
- if (!measurementNode.isLeaf()) {
- throw new QueryProcessException(
- String.format("Current Path is not leaf node. %s.%s", deviceId,
- measurementList[i]));
- }
+ MNode measurementNode = checkPathExists(node, deviceId, measurementList[i], strValues[i]);
dataTypes[i] = measurementNode.getSchema().getType();
}
@@ -261,12 +344,60 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
}
}
+ private MNode checkPathExists(MNode node, String deviceId, String measurement, String strValue)
+ throws MetadataException, QueryProcessException, StorageEngineException {
+ // check if timeseries exists
+ if (!node.hasChild(measurement)) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ throw new QueryProcessException(
+ String.format("Current deviceId[%s] does not contain measurement:%s", deviceId, measurement));
+ }
+ try {
+ addPathToMTree(deviceId, measurement, strValue);
+ } catch (MetadataException e) {
+ if (!e.getMessage().contains("already exist")) {
+ throw e;
+ }
+ }
+ }
+ MNode measurementNode = node.getChild(measurement);
+ if (!measurementNode.isLeaf()) {
+ throw new QueryProcessException(
+ String.format("Current Path is not leaf node. %s.%s", deviceId, measurement));
+ }
+ return measurementNode;
+ }
+
+ private void checkPathExists(MNode node, String fullPath, MeasurementSchema schema, boolean autoCreateSchema)
+ throws QueryProcessException, StorageEngineException, MetadataException {
+ // check if timeseries exists
+ String measurement = schema.getMeasurementId();
+ if (!node.hasChild(measurement)) {
+ if (!autoCreateSchema) {
+ throw new QueryProcessException(
+ String.format("Path[%s] does not exist", fullPath));
+ }
+ try {
+ addPathToMTree(fullPath, schema.getType(), schema.getEncodingType(), schema.getCompressor());
+ } catch (MetadataException e) {
+ if (!e.getMessage().contains("already exist")) {
+ throw e;
+ }
+ }
+ }
+ MNode measurementNode = node.getChild(measurement);
+ if (!measurementNode.isLeaf()) {
+ throw new QueryProcessException(
+ String.format("Current Path is not leaf node. %s", fullPath));
+ }
+ }
+
@Override
public Integer[] insertBatch(BatchInsertPlan batchInsertPlan) throws QueryProcessException {
try {
String[] measurementList = batchInsertPlan.getMeasurements();
String deviceId = batchInsertPlan.getDeviceId();
- MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
+ MNode node = mManager.getNodeByPathFromCache(deviceId);
TSDataType[] dataTypes = batchInsertPlan.getDataTypes();
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
@@ -697,24 +828,25 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
/**
* Add a seriesPath to MTree
*/
- private void addPathToMTree(String deviceId, String measurementId, TSDataType dataType)
+ private void addPathToMTree(String fullPath, TSDataType dataType, TSEncoding encoding,
+ CompressionType compressionType)
throws PathException, MetadataException, StorageEngineException {
- String fullPath = deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId;
- TSEncoding defaultEncoding = getDefaultEncoding(dataType);
- CompressionType defaultCompressor =
- CompressionType.valueOf(TSFileDescriptor.getInstance().getConfig().getCompressor());
boolean result = mManager.addPathToMTree(
- fullPath, dataType, defaultEncoding, defaultCompressor, Collections.emptyMap());
+ fullPath, dataType, encoding, compressionType, Collections.emptyMap());
if (result) {
storageEngine.addTimeSeries(
- new Path(fullPath), dataType, defaultEncoding, defaultCompressor, Collections.emptyMap());
+ new Path(fullPath), dataType, encoding, compressionType, Collections.emptyMap());
}
}
private void addPathToMTree(String deviceId, String measurementId, Object value)
throws PathException, MetadataException, StorageEngineException {
TSDataType predictedDataType = TypeInferenceUtils.getPredictedDataType(value);
- addPathToMTree(deviceId, measurementId, predictedDataType);
+ String fullPath = deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId;
+ TSEncoding encoding = getDefaultEncoding(predictedDataType);
+ CompressionType compressionType =
+ CompressionType.valueOf(TSFileDescriptor.getInstance().getConfig().getCompressor());
+ addPathToMTree(fullPath, predictedDataType, encoding, compressionType);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 8b889a8..706ea6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -74,6 +74,6 @@ public abstract class Operator {
DELETE_ROLE, GRANT_ROLE_PRIVILEGE, REVOKE_ROLE_PRIVILEGE, LIST_USER, LIST_ROLE,
LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS,
GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING,
- TTL, DELETE_STORAGE_GROUP
+ TTL, DELETE_STORAGE_GROUP, LOAD_FILES, REMOVE_FILE, MOVE_FILE
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/LoadFilesOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/LoadFilesOperator.java
new file mode 100644
index 0000000..e3d0906
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/LoadFilesOperator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import java.io.File;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.logical.RootOperator;
+
+public class LoadFilesOperator extends RootOperator {
+
+ private File file;
+ private boolean autoCreateSchema;
+ private int sgLevel;
+ private boolean invalid;
+ private String errMsg;
+
+ public LoadFilesOperator(File file, boolean autoCreateSchema, int sgLevel) {
+ super(SQLConstant.TOK_LOAD_FILES);
+ this.file = file;
+ this.autoCreateSchema = autoCreateSchema;
+ this.sgLevel = sgLevel;
+ this.operatorType = OperatorType.LOAD_FILES;
+ }
+
+ public LoadFilesOperator(boolean invalid, String errMsg) {
+ super(SQLConstant.TOK_LOAD_FILES);
+ this.invalid = invalid;
+ this.errMsg = errMsg;
+ this.operatorType = OperatorType.LOAD_FILES;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public boolean isAutoCreateSchema() {
+ return autoCreateSchema;
+ }
+
+ public int getSgLevel() {
+ return sgLevel;
+ }
+
+ public boolean isInvalid() {
+ return invalid;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperateFilePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperateFilePlan.java
new file mode 100644
index 0000000..028f533
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperateFilePlan.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class OperateFilePlan extends PhysicalPlan {
+
+ private File file;
+ private File targetDir;
+ private boolean autoCreateSchema;
+ private int sgLevel;
+
+ public OperateFilePlan(File file, OperatorType operatorType) {
+ super(false, operatorType);
+ this.file = file;
+ }
+
+ public OperateFilePlan(File file, OperatorType operatorType, boolean autoCreateSchema, int sgLevel) {
+ super(false, operatorType);
+ this.file = file;
+ this.autoCreateSchema = autoCreateSchema;
+ this.sgLevel = sgLevel;
+ }
+
+ public OperateFilePlan(File file, File targetDir, OperatorType operatorType) {
+ super(false, operatorType);
+ this.file = file;
+ this.targetDir = targetDir;
+ }
+
+ @Override
+ public List<Path> getPaths() {
+ return null;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public File getTargetDir() {
+ return targetDir;
+ }
+
+ public boolean isAutoCreateSchema() {
+ return autoCreateSchema;
+ }
+
+ public int getSgLevel() {
+ return sgLevel;
+ }
+
+ @Override
+ public String toString() {
+ return "OperateFilePlan{" +
+ "file=" + file +
+ ", targetDir=" + targetDir +
+ ", autoCreateSchema=" + autoCreateSchema +
+ ", sgLevel=" + sgLevel +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 928b21b..4e820de 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -50,6 +50,7 @@ import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_LINEAR;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_LINK;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_LIST;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_LOAD;
+import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_LOAD_FILES;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_PATH;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_PREVIOUS;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_PRIVILEGES;
@@ -73,6 +74,7 @@ import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_UPDATE;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_USER;
import static org.apache.iotdb.db.sql.parse.TqlParser.TOK_WHERE;
+import java.io.File;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.EnumMap;
@@ -104,6 +106,7 @@ import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator;
import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
+import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator;
import org.apache.iotdb.db.qp.logical.sys.PropertyOperator;
import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator;
import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator;
@@ -275,6 +278,9 @@ public class LogicalGenerator {
case TOK_GROUPBY_DEVICE:
((QueryOperator) initializedOperator).setGroupByDevice(true);
return;
+ case TOK_LOAD_FILES:
+ analyzeLoadFile(astNode);
+ return;
default:
throw new QueryProcessException("Not supported TqlParser type " + token.getText());
}
@@ -283,6 +289,22 @@ public class LogicalGenerator {
}
}
+ private void analyzeLoadFile(AstNode astNode){
+ if (!astNode.getChild(2).getChild(0).getText().equalsIgnoreCase("true") && !astNode.getChild(2)
+ .getChild(0).getText().equalsIgnoreCase("false")) {
+ initializedOperator = new LoadFilesOperator(true,
+ "Please check the statement: load [FILE] true/false [storage group level]");
+ } else {
+ boolean createSchemaAutomatically = astNode.getChild(2).getChild(0) == null || Boolean
+ .parseBoolean(astNode.getChild(2).getChild(0).getText());
+ int sgLevel = astNode.getChild(2).getChild(1) == null ? IoTDBDescriptor.getInstance().getConfig()
+ .getDefaultStorageGroupLevel() : Integer.parseInt(astNode.getChild(2).getChild(1).getText());
+ initializedOperator = new LoadFilesOperator(new File(astNode.getChild(1).getText()),
+ createSchemaAutomatically, sgLevel);
+ }
+ }
+
+
private void analyzeTTL(AstNode astNode) throws QueryProcessException {
int tokenType = astNode.getChild(0).getToken().getType();
switch (tokenType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index e6629da..aa40e12 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator;
import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
@@ -46,6 +47,7 @@ import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator;
import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
+import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator;
import org.apache.iotdb.db.qp.logical.sys.PropertyOperator;
import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator;
import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator;
@@ -63,6 +65,7 @@ import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
@@ -150,7 +153,13 @@ public class PhysicalGenerator {
ShowTTLOperator showTTLOperator = (ShowTTLOperator) operator;
return new ShowTTLPlan(showTTLOperator.getStorageGroups());
}
-
+ case LOAD_FILES:
+ if (((LoadFilesOperator) operator).isInvalid()) {
+ throw new LogicalOperatorException(((LoadFilesOperator) operator).getErrMsg());
+ }
+ return new OperateFilePlan(((LoadFilesOperator) operator).getFile(),
+ OperatorType.LOAD_FILES, ((LoadFilesOperator) operator).isAutoCreateSchema(),
+ ((LoadFilesOperator) operator).getSgLevel());
default:
throw new LogicalOperatorException(operator.getType().toString(), "");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index b0c9df2..525da55 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.SyncDeviceOwnerConflictException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
@@ -139,10 +140,10 @@ public class FileLoader implements IFileLoader {
return;
}
TsFileResource tsFileResource = new TsFileResource(newTsFile);
- checkTsFileResource(tsFileResource);
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
try {
FileLoaderManager.getInstance().checkAndUpdateDeviceOwner(tsFileResource);
- StorageEngine.getInstance().loadNewTsFile(tsFileResource);
+ StorageEngine.getInstance().loadNewTsFileForSync(tsFileResource);
} catch (SyncDeviceOwnerConflictException e) {
LOGGER.error("Device owner has conflicts, so skip the loading file", e);
} catch (TsFileProcessorException | StorageEngineException e) {
@@ -152,33 +153,6 @@ public class FileLoader implements IFileLoader {
loadLog.finishLoadTsfile(newTsFile);
}
- private void checkTsFileResource(TsFileResource tsFileResource) throws IOException {
- if (!tsFileResource.fileExists()) {
- // .resource file does not exist, read file metadata and recover tsfile resource
- try (TsFileSequenceReader reader = new TsFileSequenceReader(
- tsFileResource.getFile().getAbsolutePath())) {
- TsFileMetaData metaData = reader.readFileMetadata();
- for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
- TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
- List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
- .getChunkGroupMetaDataList();
- for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
- for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
- tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
- chunkMetaData.getStartTime());
- tsFileResource
- .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
- }
- }
- }
- }
- // write .resource file
- tsFileResource.serialize();
- } else {
- tsFileResource.deSerialize();
- }
- }
-
private void loadDeletedFile(File deletedTsFile) throws IOException {
if (curType != LoadType.DELETE) {
loadLog.startLoadDeletedFiles();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
new file mode 100644
index 0000000..ecd2852
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+
+public class FileLoaderUtils {
+
+ private FileLoaderUtils() {
+ }
+
+ public static void checkTsFileResource(TsFileResource tsFileResource) throws IOException {
+ if (!tsFileResource.fileExists()) {
+ // .resource file does not exist, read file metadata and recover tsfile resource
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(
+ tsFileResource.getFile().getAbsolutePath())) {
+ TsFileMetaData metaData = reader.readFileMetadata();
+ for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
+ TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
+ List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+ .getChunkGroupMetaDataList();
+ for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+ for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+ tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
+ chunkMetaData.getStartTime());
+ tsFileResource
+ .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+ }
+ }
+ }
+ }
+ // write .resource file
+ tsFileResource.serialize();
+ } else {
+ tsFileResource.deSerialize();
+ }
+ }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
new file mode 100644
index 0000000..8bb8f6d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
@@ -0,0 +1,89 @@
+package org.apache.iotdb.db.integration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBLoadExternalTsfileTest {
+
+ private static IoTDB daemon;
+ private static String[] sqls = new String[]{
+ "SET STORAGE GROUP TO root.vehicle",
+ "SET STORAGE GROUP TO root.test",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "insert into root.vehicle.d0(timestamp,s0) values(10,100)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(12,101,'102')",
+ "insert into root.vehicle.d0(timestamp,s1) values(19,'103')",
+ "insert into root.vehicle.d1(timestamp,s2) values(11,104.0)",
+ "insert into root.vehicle.d1(timestamp,s2,s3) values(15,105.0,true)",
+ "insert into root.vehicle.d1(timestamp,s3) values(17,false)",
+ "insert into root.vehicle.d0(timestamp,s0) values(20,1000)"
+ };
+
+ private static final String TIMESTAMP_STR = "Time";
+ private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature";
+ private static final String STATUS_STR = "root.ln.wf01.wt01.status";
+ private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ daemon.stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void LoadNewTsfileTest() throws SQLException {
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("flush");
+ List<TsFileResource> resources = StorageEngine.getInstance().getProcessor("root.vehicle")
+ .getSequenceFileList();
+ for(TsFileResource resource:resources){
+ System.out.println(resource.getFile().getAbsolutePath());
+ }
+ } catch (StorageEngineException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void prepareData() throws SQLException {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+
+
+ for (String sql : sqls) {
+ statement.execute(sql);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
index 0fedf08..91d0d15 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
@@ -215,7 +215,7 @@ public class MManagerImproveTest {
private void doCacheTest(String deviceId, List<String> measurementList)
throws CacheException, PathException {
- MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
+ MNode node = mManager.getNodeByPathFromCache(deviceId);
for (String s : measurementList) {
assertTrue(node.hasChild(s));
MNode measurementNode = node.getChild(s);
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
index 5284622..1096389 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.File;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.QueryProcessor;
@@ -35,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
import org.apache.iotdb.db.query.fill.LinearFill;
@@ -485,4 +487,33 @@ public class PhysicalPlanTest {
Assert.assertEquals(2, dataAuthPlan.getUsers().size());
Assert.assertEquals(OperatorType.REVOKE_WATERMARK_EMBEDDING, dataAuthPlan.getOperatorType());
}
+
+ @Test
+ public void testLoadFiles() throws QueryProcessException, MetadataException {
+ String filePath = "data" + File.separator + "213213441243-1-2.tsfile";
+ String metadata = String.format("load %s", filePath);
+ QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
+ OperateFilePlan plan = (OperateFilePlan) processor.parseSQLToPhysicalPlan(metadata);
+ assertEquals(String.format("OperateFilePlan{file=%s, targetDir= null, operatorType=LOAD_FILES}", filePath), plan.toString());
+ metadata = String.format("load %s true", filePath);
+ processor = new QueryProcessor(new MemIntQpExecutor());
+ plan = (OperateFilePlan) processor.parseSQLToPhysicalPlan(metadata);
+ assertEquals(String.format(
+ "OperateFilePlan{file=data/213213441243-1-2.tsfile, targetDir=null, autoCreateSchema=true, sgLevel=2}",
+ filePath), plan.toString());
+
+ metadata = String.format("load %s false", filePath);
+ processor = new QueryProcessor(new MemIntQpExecutor());
+ plan = (OperateFilePlan) processor.parseSQLToPhysicalPlan(metadata);
+ assertEquals(String.format(
+ "OperateFilePlan{file=data/213213441243-1-2.tsfile, targetDir=null, autoCreateSchema=false, sgLevel=2}",
+ filePath), plan.toString());
+
+ metadata = String.format("load %s true 3", filePath);
+ processor = new QueryProcessor(new MemIntQpExecutor());
+ plan = (OperateFilePlan) processor.parseSQLToPhysicalPlan(metadata);
+ assertEquals(String.format(
+ "OperateFilePlan{file=data/213213441243-1-2.tsfile, targetDir=null, autoCreateSchema=true, sgLevel=3}",
+ filePath), plan.toString());
+ }
}