You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2021/03/02 11:01:50 UTC
[iotdb] 01/01: Index framework: part 2
This is an automated email from the ASF dual-hosted git repository.
kangrong pushed a commit to branch index_manager_part2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b0639d57bc4e6901fd482e36c0a3a71c7e2387e9
Author: kr11 <3095717866.com>
AuthorDate: Tue Mar 2 19:00:04 2021 +0800
Index framework: part 2
---
docs/zh/SystemDesign/Index/Index.md | 672 +++++++++++++++++++++
.../resources/conf/iotdb-engine.properties | 6 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 33 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 +
.../iotdb/db/engine/flush/MemTableFlushTask.java | 34 +-
.../db/engine/storagegroup/TsFileProcessor.java | 2 +-
.../db/exception/index/QueryIndexException.java | 5 +
...ion.java => UnsupportedIndexFuncException.java} | 10 +-
.../iotdb/db/index/IndexBuildTaskPoolManager.java | 78 +++
.../org/apache/iotdb/db/index/IndexManager.java | 329 ++++++++++
.../apache/iotdb/db/index/IndexManagerMBean.java | 21 +
.../iotdb/db/index/IndexMemTableFlushTask.java | 75 +++
.../org/apache/iotdb/db/index/IndexProcessor.java | 514 ++++++++++++++++
.../iotdb/db/index/algorithm/IoTDBIndex.java | 182 ++++++
.../apache/iotdb/db/index/algorithm/NoIndex.java | 68 +++
.../apache/iotdb/db/index/common/DistSeries.java | 39 ++
.../iotdb/db/index/common/IndexConstant.java | 54 ++
.../apache/iotdb/db/index/common/IndexInfo.java | 137 +++++
.../db/index/common/IndexProcessorStruct.java | 52 ++
.../apache/iotdb/db/index/common/IndexType.java | 56 +-
.../apache/iotdb/db/index/common/IndexUtils.java | 75 +++
.../common/func/CreateIndexProcessorFunc.java | 39 ++
.../iotdb/db/index/common/func/IndexNaiveFunc.java | 34 ++
.../iotdb/db/index/common/math/Randomwalk.java | 44 ++
.../common/math/probability/Probability.java} | 12 +-
.../common/math/probability/UniformProba.java} | 24 +-
.../db/index/feature/IndexFeatureExtractor.java | 106 ++++
.../iotdb/db/index/read/IndexQueryDataSet.java | 41 ++
.../iotdb/db/index/read/QueryIndexExecutor.java | 56 ++
.../optimize/IIndexCandidateOrderOptimize.java | 38 ++
.../read/optimize/NoCandidateOrderOptimizer.java | 21 +
.../apache/iotdb/db/index/router/IIndexRouter.java | 129 ++++
.../iotdb/db/index/router/ProtoIndexRouter.java | 424 +++++++++++++
.../apache/iotdb/db/index/usable/IIndexUsable.java | 98 +++
.../db/index/usable/SubMatchIndexUsability.java | 311 ++++++++++
.../db/index/usable/WholeMatchIndexUsability.java | 79 +++
.../org/apache/iotdb/db/metadata/PartialPath.java | 18 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 43 +-
.../iotdb/db/qp/physical/crud/QueryIndexPlan.java | 9 +
.../iotdb/db/query/executor/IQueryRouter.java | 12 +
.../iotdb/db/query/executor/QueryRouter.java | 12 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +
.../org/apache/iotdb/db/service/ServiceType.java | 5 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 13 +
.../iotdb/db/utils/datastructure/TVList.java | 29 +
.../writelog/recover/TsFileRecoverPerformer.java | 3 +-
.../db/engine/memtable/MemTableFlushTaskTest.java | 3 +-
.../apache/iotdb/db/index/it/NoIndexWriteIT.java | 162 +++++
.../db/index/router/ProtoIndexRouterTest.java | 179 ++++++
.../index/usable/SubMatchIndexUsabilityTest.java | 233 +++++++
.../index/usable/WholeMatchIndexUsabilityTest.java | 55 ++
.../apache/iotdb/db/utils/EnvironmentUtils.java | 8 +
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 47 ++
54 files changed, 4700 insertions(+), 44 deletions(-)
diff --git a/docs/zh/SystemDesign/Index/Index.md b/docs/zh/SystemDesign/Index/Index.md
new file mode 100644
index 0000000..cf0fba9
--- /dev/null
+++ b/docs/zh/SystemDesign/Index/Index.md
@@ -0,0 +1,672 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# 相似性索引框架文档
+
+[TOC]
+
+## 第1章 任务目标
+
+### 背景
+
+在最近的二十年间,谷歌一系列重要工作的发表 (GFS ,BigTable ,Map-Reduce 等),以及相应的开源框架的兴起 (Hadoop,HBase 等) 推动了大数据技术的兴起和蓬勃发展。以BigTable,HBase,Cassandra 为代表的NoSQL数据库系统对新一代数据库技术产生了巨大的影响。在这一背景下,为了管理气象、医疗、工业物联网等领域产生的大量时间序列数据,时序数据库系统也得到了快速发展 。
+
+然而,现有数据库系统通常仅支持经典查询(点查询,范围查询和带值过滤条件的查询),常见的聚合查询(最大值,最小值,均值,求和等)和少量领域相关操作(如按时间间隔聚合,按时间戳对齐等)等等。
+在传统的关系型数据库系统中,如Oracle和MySQL,往往只集成了几种被广泛使用的索引技术,如用于加速范围查询的B树和加速等值查询的哈希表。很多当前主流的时序数据库系统(如InfluxDB,OpenTSDB、Apache IoTDB)仍不支持时间序列的相似性索引。InfluxDB提出了时间序列索引(Time Series Index,TSI)。但主要用于对海量时间序列的路径、标签等元信息的快速查找。在机器学习领域,开发者提出了许多索引代码库,但主要面向近似检索,同时并未与时序数据库系统进行集成。基于时序数据库系统的索引机制仍然是有待解决的问题。
+
+针对时序数据的相似性索引一直是非常重要的研究课题。研究者们提出了一系列索引结构,致力于解决全序列检索和子序列检索问题。然而,研究者提出的索引技术通常以独立项目的形式存在。在应用于生产场景时,需要全面考虑数据采集/存储、索引构建/查询接口、数据更新、数据容灾等问题。时序数据库系统具有规范化的查询语言、丰富而健壮的读写接口、高效的数据读写性能以及较为完善的容灾策略。将相似性索引技术集成到时序数据库系统中,不仅丰富了数据库系统的功能,同时也会简化索引在面对复杂应用场景时的适配工作,提升索引的易用性,有助于索引技术在生产实践中落地。然而,数据库系统是复杂的,与数据库系统的对接必须综合考虑SQL定义、执行计划、读写流程以及返回格式定义等技术细节。这些复杂的问题制约了索引与�
�据库系统的深度集成。
+
+
+
+#### 目标
+
+使IoTDB支持时间序列相似性检索功能,并集成包括复合模式索引、R树索引在内的多种索引技术。
+
+## 详细设计
+
+### 基本概念
+
+#### 什么是“相似性检索”
+
+相似性搜索是时间序列领域中最重要的方向之一。与IoTDB中现有的查询条件不同,相似性搜索输入一条“序列”作为查询,目的是高效地从数据库中找到一系列与“查询序列”相似的其他。
+
+所谓“相似”:两条序列之间的“相似性”取决于它们的“欧氏距离”或其他距离函数。
+
+如下图形象地展示了如何用“欧氏距离(Euclidean)”和“动态时间规整距离(DTW)”度量两条时间序列的相似性:
+
+![image-20210225015048345](https://user-images.githubusercontent.com/5548915/109539640-849a8b00-7afc-11eb-8c95-b666d6dac2ca.png)
+
+
+
+#### 什么是“特征索引”
+
+在关系型数据库中,一些经典的索引结构,如B树和哈希表,都是对关系表中的属性创建索引。而相似性索引的检索对象是高维序列。由于高维序列存在维度灾难问题,相似性索引需要首先将高维序列转换成低维特征,然后对低维特征创建索引。如果将关系型数据库系统中的B树和哈希表称为“基于属性的索引”,那么时序数据库系统中的相似性索引可以被视为“基于特征的索引”。
+
+数据库系统中的特征索引从逻辑上可以被划分为三层结构,自下而上依次是数据层、特征层和索引层:
+
+ * 数据层:对应着数据空间中的原始序列。 在IoTDB中,系统接收大量传感器数据并将其组织成TsFile文件格式写入磁盘;
+ * 特征层:对应着特征空间中的低维特征。 每条特征的大小通常远小于原始时间序列。 一条特征记录包含两部分信息:特征向量和指向数据文件的指针;
+ * 索引层:对应着索引结构。通过额外的内存或磁盘结构,索引结构可以对特征进行有效的组织。索引结构的大小通常远小于由它组织的特征和原始序列。在索引结构底层的索引项中包含了一个或多个指向特征的指针;
+
+在构造索引时,索引机制首先将数据层中的原始序列转换为特征,然后写入索引结构中。在执行查询时,索引机制将查询序列转换为特征,然后输入索引结构。通过索引结构和特征信息对被检索集进行剪枝,得到候选集,候选集中包含了一系列指向原始数据的指针;然后访问候选集所对应的原始数据,得到最终的查询结果。
+
+### 功能定义
+
+索引框架将满足两类使用者的需求:
+
+* 对数据库管理员和用户来说,索引机制设计了**索引创建、查询和删除的SQL语句**,并设计了与IoTDB其他查询相一致的返回结果格式。
+* 对索引实现者来说,索引机制暴露出一套与索引技术密切相关的接口,包括特征选型、索引插入和索引过滤等等。索引实现者可以在不必关心数据库系统技术细节的情况下,便捷地集成各种索引技术。
+
+#### 用户功能:面向用户和DBA
+
+> 注:面向用户和DBA的代码主要涉及SQL和查询计划的改动,这部分代码已经合并到主分支中,参见[PR-2024(IoTDB-804)](https://github.com/apache/iotdb/pull/2024)。
+
+本节将从两个工业场景中的示例出发,阐述两种检索场景(全序列检索和子序列检索)的使用方式。
+
+##### 全序列检索
+
+![image](https://user-images.githubusercontent.com/5548915/109540349-4782c880-7afd-11eb-9c67-ae83697f77d3.png)
+
+上图展示了制药工厂在IoTDB中的数据库模式。为了简洁清晰,图中省略了部分与本节无关的信息。工厂包含多个发酵罐(Ferm01、Ferm02等等),每个发酵罐逐批次地进行红霉素发酵工作,每个批次持续时间为7天。对于每个批次(如Ferm02发酵罐的20191017批次),工厂会对发酵过程中的物理量进行记录,包括葡萄糖补糖率(Glu)、二氧化碳释放率(CER)、酸碱度(pH)等等。
+
+分析人员希望为所有发酵罐、所有批次的补糖率序列(Glu)创建基于“PAA特征的R树索引”(记为RTREE_PAA)。创建索引格式如下:
+
+```sql
+CREATE INDEX ON seriesPath
+WITH INDEX=indexName (, key=value)*
+```
+
+除了必须指定索引名之外,还需要传入一系列创建索引所需的参数。在本案例中,创建索引语句为:
+
+```sql
+CREATE INDEX ON root.Ery.*.Glu
+WITH INDEX=RTREE_PAA, PAA_DIM=8
+```
+
+索引类别为RTREE_PAA,即基于PAA特征的R树索引:将时间序列转换为8维PAA特征,然后交由R树组织。
+
+创建索引语句不限制索引参数的数量和格式,以此保证良好的可拓展性。用户提交的参数将由索引注册表记录。当索引实例被加载到内存中时,索引机制会将参数信息传给索引实例。
+
+分析人员希望在所有发酵罐的所有批次中,寻找与某种“补糖策略”最接近的2条补糖率曲线,查询语句如下:
+
+```sql
+SELECT TOP 2 Glu
+FROM root.Ery.*
+WHERE Glu LIKE (0, 120, 20, 80, ..., 120, 100, 80, 0)
+```
+
+通过关键字LIKE,IoTDB的查询处理器将这条查询语句交给索引机制执行。
+为了与IoTDB的经典查询结果格式保持一致,全序列检索的每一条结果单独成列,并按照时间戳顺序对齐。如下图所示,与给定查询序列最接近的两条序列分别是发酵罐Ferm01的20191018批次和发酵罐Ferm02的20191024批次。两条序列按照时间戳对齐。
+
+![image](https://user-images.githubusercontent.com/5548915/109540059-f70b6b00-7afc-11eb-938e-d921a9a5d667.png)
+
+索引机制允许用户删除一个被创建的索引,删除索引格式如下:
+
+```sql
+DROP INDEX indexName ON seriesPath
+```
+
+删除索引的格式非常简单。在本例中,分析人员通过以下语句删除索引:
+
+```sql
+DROP INDEX RTREE_PAA ON root.Ery.*.Glu
+```
+
+##### 子序列检索
+
+![image](https://user-images.githubusercontent.com/5548915/109540439-65502d80-7afd-11eb-87d6-cf9b7e46ecfd.png)
+
+上图展示了风力发电厂在IoTDB中的数据库模式。风电厂包含多个风机(AZQ01、AZQ02等等),每个风机对风机状态和周围环境进行持续监测。监测传感器包括风速(Speed)、风向(Direction)、风机功率(Power)等等。
+
+分析人员希望为风机AZQ02的风速传感器序列(Speed)创建“等长数据块索引”。创建索引语句为:
+
+```sql
+CREATE INDEX ON root.Wind.AZQ02.Speed
+WITH INDEX=ELB_INDEX, Block_Size=5
+```
+
+创建索引后,分析人员希望在AZQ02风机的风速序列中寻找与"极端运行阵风"相似的子序列。其中极端运行阵风为复合模式,在风速高峰期阈值要求小于4,在其他区间段阈值要求小于2。查询语句如下:
+
+```sql
+SELECT Speed.* FROM root.Wind.AZQ02
+WHERE Speed
+CONTAIN (15, 14, 12, ..., 12, 12, 11) WITH TOLERANCE 2
+CONCAT (10, 20, 25, ..., 24, 14, 8) WITH TOLERANCE 4
+CONCAT (8, 9, 10, ..., 14, 15, 15) WITH TOLERANCE 2
+```
+
+通过 CONTAIN 和 WITH TOLERANCE等关键字,IoTDB的查询处理器将这条查询语句交给索引机制执行,索引机制进一步调用相应的ELB索引。
+
+子序列检索的结果是长序列上的子片段,原本并不存在于数据库模式中。为了与IoTDB的其他查询结果格式保持一致,每一个检索结果单独成列,列名由“传感器序列名”和“子片段起始时间”连缀而成。如下图所示,复合模式的检索结果共两条,分别从时刻2019-10-18 12:30:00.000和2019-10-18 12:30:10.000开始。
+
+![image](https://user-images.githubusercontent.com/5548915/109539955-db07c980-7afc-11eb-8848-41983d393f45.png)
+
+删除索引操作与全序列检索场景类似,不再赘述。
+
+#### 用户功能:面向索引实现者
+
+索引机制致力于屏蔽数据库系统复杂的运行逻辑和消息处理,为开发者提供简洁而友好的索引集成平台。
+
+如果想为IoTDB添加一种新的索引技术,索引实现者需要继承`IoTDBIndex` 类,并在`IndexType` 这一枚举类中添加新的索引类型(“以硬编码的方式向枚举类中添加索引”仍然不够优雅,我们会在未来的版本中用其他方式取代)。
+
+##### IoTDBIndex
+
+`IoTDBIndex`是一个抽象类,一些关键的属性和抽象方法如下:
+
+```java
+// 本索引的特征处理器。所有特征处理器都继承自IndexFeatureExtractor类
+protected IndexFeatureExtractor indexFeatureExtractor;
+
+/**
+ 为本索引使用的特征提取器进行初始化操作
+ previous:如果特征处理器在上一次系统关闭时存储了一些状态信息,这些状态信息会以previous参数传入
+ inQueryMode:用于索引写入还是索引查询。索引可以在两种场景中使用不同的特征提取器。
+ */
+public abstract void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode);
+
+// 从特征提取器中获取最新产生的序列和特征信息,写入索引。
+public abstract boolean buildNext() throws IndexManagerException;
+
+// 系统关闭时,将内存中的索引数据序列化到磁盘
+protected abstract void flushIndex();
+
+// 执行索引查询。
+public abstract QueryDataSet query(
+ Map<String, Object> queryProps,
+ IIndexUsable iIndexUsable,
+ QueryContext context,
+ IIndexRefinePhaseOptimize refinePhaseOptimizer,
+ boolean alignedByTime)
+ throws QueryIndexException;
+
+```
+
+##### IndexType
+
+每种索引都有自己的唯一类型`IndexType`。当索引实现者希望添加一种索引时,需要在`IndexType`中增加一种类型,并为以下函数添加一个新分支:
+
+```java
+public static IndexType deserialize(short i);
+
+public short serialize();
+
+private static IoTDBIndex newIndexByType(PartialPath path, TSDataType tsDataType, String indexDir, IndexType indexType, IndexInfo indexInfo);
+```
+
+##### IndexFeatureExtractor
+
+> 在索引框架的第二次PR中暂不涉及IndexFeatureExtractor的具体实现。
+
+在全序列特征提取器中,特征提取器接一条原始数据作为输入,从中提取出三种信息,以应对索引生成和查询过程中存在的各种需求。三种信息从简单到复杂分别为:
+
+1. L1-序列指针:包含三元组信息:序列路径、起始时间戳和结束时间戳。
+2. L2-规整序列:将不定频、有缺失数据的序列转换为维度固定或时间间隔整齐的序列,供索引处理。
+3. L3-序列特征:基于规整序列进一步计算得到的特征。格式不固定。
+
+以上三种信息分别在索引读写过程中起到不同的作用。序列指针是指向原始数据的指针。
+由于IoTDB会在形成TsFile文件时对原始数据进行编码和压缩,因此无法简单地通过"文件路径+偏移量"来获取原始数据。取而代之的是,根据序列名和起止时间这三元组信息来唯一确定一段原始数据。序列指针会在特征文件和候选集中被使用;
+规整序列是多数索引可以接受的输入形式。在实验条件下,索引结构可以认为时序数据维度固定、时间戳等间隔且没有缺失值。但在实际情况中,序列可能存在微小的时间戳偏移或少量的数据错漏,规整序列对原始序列进行了预处理,去除了上述数据缺陷。
+序列特征是索引对原始序列进行特征提取后的结果。特征提取方法往往是索引技术的关键创新所在。
+
+如果索引实现者希望自定义实现特征提取器 `IndexFeatureExtractor` ,需要实现以下接口:
+
+```java
+// 加入一段原始数据以备处理
+public abstract void appendNewSrcData(TVList newData);
+
+// 加入一段原始数据以备处理,由于代码历史原因,IoTDB在数据写入时生成TVList,而查询时却会得到BatchData
+public abstract void appendNewSrcData(BatchData newData);
+
+// 索引框架会首先询问是否有下一条数据
+public abstract boolean hasNext();
+
+// 如果有,调用本方法产生一个待插入数据。注意,在appendNewSrcData中插入一段原始数据后,可能产生多条待插入数据
+public abstract void processNext();
+
+// 处理了一批数据后,调用本方法来释放已经处理过的数据的资源
+public abstract void clearProcessedSrcData();
+
+// 关闭特征处理器,返回状态数据,用于下一次打开时恢复状态
+public abstract ByteBuffer closeAndRelease() throws IOException;
+
+// 返回 L2-规整数据
+public Object getCurrent_L2_AlignedSequence();
+
+// 返回 L3-特征
+public Object getCurrent_L3_Feature();
+```
+
+
+
+#### 系统参数配置
+
+系统参数如下:
+
+| 参数名 | 类型 | 默认值 | 说明 |
+| ------------------------------------- | ------ | ----------- | ---------------------------------------------------- |
+| enable_index | bool | false | 是否开启索引 |
+| index_root_dir | string | data/index | 所有索引文件的总目录 |
+| concurrent_index_build_thread | int | 系统CPU核数 | 执行索引写入时的最大并行线程数量 |
+| default_index_window_range | int | 10 | 目前未用到,拟删除 |
+| index_buffer_size | long | 134217728 | 目前未用到,拟删除 |
+| max_index_query_result_size | int | 5 | 索引检索结果的最大返回数量 |
+| default_max_size_of_unusable_segments | int | 20 | 可用区间管理器的最大段数,见`SubMatchIndexUsability` |
+
+### 模块设计
+
+下图展示了IoTDB索引机制的模块设计图。索引机制会与IoTDB中的多个模块(绿色框图)产生消息和数据交互(绿色箭头)。例如,在IoTDB系统启动和关闭时会启动或关闭索引机制;当某条序列有新的数据点写入时,IoTDB存储管理器会调用索引机制采取相应操作;当用户发起索引查询时,IoTDB查询处理器会将查询条件转交给索引机制,获取索引查询结果并返回给用户。
+
+![image-20210227025206792](https://user-images.githubusercontent.com/5548915/109539696-92501080-7afc-11eb-8621-a7d4cabcd94c.png)
+
+索引机制内部的各个模块(蓝色框图)也会产生消息和数据交互。索引实现者可以继承指定接口为索引机制添加新的索引实例。通过这些接口,索引机制会调用索引实例完成索引写入和查询等功能(蓝色箭头)。
+
+下面分别介绍各个模块的功能和接口。
+
+#### 特征提取器
+
+对应代码类:`IndexFeatureExtractor` 及其子类。
+
+特征提取器负责对IoTDB的原始序列进行预处理和特征提取。在索引写入阶段,对于全序列检索,特征提取器对整条序列进行特征提取;对于子序列检索,特征提取器首先利用滑动窗口模型将长序列截取为短片段,然后提取特征。特征方法和相关参数在创建索引的时候指定;在索引查询阶段,特征提取器为查询序列提取特征。由于索引技术细节的不同,查询阶段的特征方法可能与写入阶段的特征方法相同或不同。
+系统已经内置了几种较为常见的特征提取器。索引实现者可以直接选用内置的特征提取器,或者实现自定义的新特征提取器。特征提取器的接口详见2.2.2.3节:IndexFeatureExtractor。
+
+#### 索引调度器
+
+首先介绍“索引序列”概念:"索引序列"是一个索引实例覆盖的"序列"或"序列集合"。将索引序列或索引序列集合的所有数据会写入同一个索引实例中。例如,在全序列索引的例子中,`root.Ery.*.Glu ` 就是这个索引的“索引序列”,而在子序列索引的例子中,`root.Wind.AZQ02.Speed` 就是ELB索引的“索引序列”。
+
+每个索引在创建的时候会指定它的“索引序列”,索引序列可能包含一条或多条时间序列。两个“索引序列”之间可能有交集。一个索引序列上可能会创建多种索引。
+
+索引、索引序列和时间序列的ER图是:
+
+![image](https://user-images.githubusercontent.com/5548915/109540557-9a5c8000-7afd-11eb-9be3-ede027e81bb2.png)
+
+每个索引序列上可能会创建多种索引实例,索引框架中的每个索引序列对应一个“实力调度器”(IndexProcessor),负责调度该索引序列下的所有索引实例。
+
+总调度器(IndexManager)、实例调度器(IndexProcessor)和索引实例(Index)的ER图是:
+
+![image](https://user-images.githubusercontent.com/5548915/109540566-9c264380-7afd-11eb-92ef-c9bcc019f4f0.png)
+
+
+
+**总调度器**:对应代码类:`IndexManager`
+
+IndexManager是索引机制的全局管理者,IoTDB接到的索引创建、删除、查询以及写入均会调用IndexManager。IndexManager会将索引操作分发给相应的IndexProcessor执行。
+
+最初的想法是,"剪枝阶段"由索引实例得到候选集,而"精化阶段"完全由索引框架完成(即对候选集所对应的原始数据进行遍历)。这样的方案是足够通用的。然而,索引技术有各种优化,强制执行上述策略会影响索引的集成。例如,"剪枝阶段"也可能要访问原始数据(RTree),精化阶段也有特定的优化(ELB-Index)。由于已经实现的这两种索引技术均有各自的优化,因此,当前的查询接口将整个查询过程全部交给索引实例完成。
+
+接口如下:
+
+```java
+// 创建索引,目前indexSeriesList中仅包含单条路径。
+public void createIndex(List<PartialPath> indexSeriesList, IndexInfo indexInfo);
+
+// 删除索引
+public void dropIndex(List<PartialPath> indexSeriesList, IndexType indexType);
+
+// 索引查询。
+public QueryDataSet queryIndex(List<PartialPath> paths, IndexType indexType,
+ Map<String, Object> queryProps, QueryContext context, boolean alignedByTime)
+
+// 关闭操作
+private synchronized void close();
+
+```
+
+
+
+**实例调度器**:对应代码类:` IndexProcessor`
+
+每个IndexProcessor对应一个索引序列,负责管理一个索引序列下的所有索引实例操作。
+
+属性如下:
+
+```java
+/**
+每个索引实例在其关闭时会生成一段状态信息(previousData),用于下一次加载时恢复到当前状态。每个 IndexProcessor 的所有 previousData 会被存储于同一个文件。索引可以管理和存储自身的信息,因此 previousData 不是必须的。但由于特征处理器也会产生 previousData,如果索引不想管理这部分信息,则可以将其抛给 IndexProcessor 管理
+*/
+private final Map<IndexType, ByteBuffer> previousDataBufferMap;
+
+// 每个索引实例对应一个可用区间管理器(IIndexUsable),每个IndexProcessor的所有IIndexUsable会被存储于同一个文件
+private Map<IndexType, IIndexUsable> usableMap;
+
+// 精化阶段(精化阶段)的优化器。目前并未用到。未来设计详见{@link IIndexCandidateOrderOptimize}.
+private final IIndexCandidateOrderOptimize refinePhaseOptimizer;
+```
+
+接口如下:
+
+```java
+// 将已排序的序列写入所有索引中
+public void buildIndexForOneSeries(PartialPath path, TVList tvList);
+
+// 需要等待索引全部刷写完才会返回
+public void endFlushMemTable();
+
+// 将除了NoIndex之外的索引实例刷出到磁盘
+public synchronized void close(boolean deleteFiles) throws IOException;
+
+/**
+根据传入的 indexInfoMap 刷新当前IndexProcessor中的索引实例。当用户进行创建或删除索引,则IndexProcessor中维护的索引实例就过期了,因此需要刷新。
+*/
+public void refreshSeriesIndexMapFromMManager(Map<IndexType, IndexInfo> indexInfoMap);
+
+// 对于乱序数据,目前的设计中直接将数据标记为"索引不可用"
+void updateUnsequenceData(PartialPath path, TVList tvList);
+```
+
+
+
+**索引刷写任务器**,对应`IndexMemTableFlushTask`。
+
+在当前的设计中,索引不会每次有新数据点到来就实时更新,而是在IoTDB的flush时批量写入。当一个存储组执行flush任务时会创建一个flush线程`MemTableFlushTask`。如果系统索引功能开启(enableIndex==true), `MemTableFlushTask` 会将MemTable中的时间序列交给IndexManager写入。考虑到系统代码解耦,在flush过程中的所有索引相关操作封装在一个``IndexMemTableFlushTask`` 中。
+
+不直接写入IndexManager,而是要向IndexManager申请一个``IndexMemTableFlushTask``对象的原因是,IoTDB的存储组的flush任务是独立而并行的,但IndexManager是全局单例的。同时,IoTDB的不同存储组所创建的索引也可能互相不覆盖(即对应不同的IndexProcessor,互相可能并没有并行冲突),因此,每个存储组在flush的时候,向IndexManager申请独立的`IndexMemTableFlushTask` 可以提升并行效率。
+
+接口如下:
+
+```java
+// 插入排序后的序列
+public void buildIndexForOneSeries(PartialPath path, TVList tvList);
+
+// 等待所有IndexProcessor写入完成后再返回
+public void endFlush();
+```
+
+
+
+### 特征文件管理器、索引文件管理器
+
+特征文件管理器指定特征文件的存放位置,索引文件管理器指定索引内存结构刷写到磁盘的文件位置。这两个单元的功能都蕴含在索引调度器中。
+
+### 注册表
+
+对应`IIndexRouter` 及其子类。
+
+`IIndexRouter`的第一个功能是管理索引实例的元数据。当新的索引被创建或删除后,注册表会同步更新。
+
+`IIndexRouter`的第二个功能是将索引操作命令高效地传递到相应的`IndexProcessor`。考虑到`IoTDBIndex`、`IndexProcessor`的映射关系较为复杂且可能在未来的设计中有所改动,`IIndexRouter` 可以将这些映射工作与`IndexManager` 解锁。
+
+接口如下:
+
+```java
+// 添加索引
+boolean addIndexIntoRouter(PartialPath prefixPath, IndexInfo indexInfo, CreateIndexProcessorFunc func, boolean doSerialize) throws MetadataException;
+
+// 删除索引
+boolean removeIndexFromRouter(PartialPath prefixPath, IndexType indexType);
+
+// 返回给定 indexSeries 下面的索引信息
+Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries);
+
+// 获取所有 IndexProcessor 及其信息
+Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo();
+
+// 返回给定 timeSeries 所属的所有 IndexProcessors
+Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries);
+
+// 序列化
+void serialize(boolean doClose);
+
+// 反序列化
+void deserializeAndReload(CreateIndexProcessorFunc func);
+
+// 返回一个存储组相关的 IIndexRouter 子集。这是为了提高 IIndexRouter 访问的并行性
+IIndexRouter getRouterByStorageGroup(String storageGroupPath);
+
+// 开始查询
+IndexProcessorStruct startQueryAndCheck(
+ PartialPath partialPath, IndexType indexType, QueryContext context)
+ throws QueryIndexException;
+
+// 结束查询
+void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context);
+```
+
+**ProtoIndexRouter** :是 `IIndexRouter` 的一种简单实现。
+
+子序列索引针对单一序列创建,而全序列索引针对有通配符的一组序列创建,因此 `ProtoIndexRouter` 对两种场景用不同的Map结构管理。
+
+```java
+// 子序列索引,索引序列为全路径
+private Map<String, IndexProcessorStruct> fullPathProcessorMap;
+
+// 全序列索引,索引序列包含通配符
+private Map<PartialPath, IndexProcessorStruct>` wildCardProcessorMap
+```
+
+`IIndexRouter` 的关键功能是为一个序列快速找到其所属的`IndexProcessor`。如果该序列是全路径的,则可以在`fullPathProcessorMap` 中以 $O(1)$ 找到;否则,必须遍历 `wildCardProcessorMap` 中的每一个包含通配符的路径。
+
+
+
+### 可用区间管理器
+
+可用区间管理器用于处理乱序数据。
+大多数时间序列数据点会按照数据时间戳顺序到来,但当乱序数据到来,或者用户手动更新了某段数据后,这一段数据上的索引正确性无法保证。
+对于这些数据,可用区间管理器将其标记为索引不可用。不可用区间的序列将被加入候选集中,接受"精化阶段"的检验。
+
+接口如下:
+
+```java
+// 增加可用区间
+void addUsableRange(PartialPath fullPath, long start, long end);
+
+// 增加不可用区间
+void minusUsableRange(PartialPath fullPath, long start, long end);
+
+// 获取不可用区间,注意,全序列索引和子序列索引的返回格式不同
+Object getUnusableRange();
+
+// 序列化
+void serialize(OutputStream outputStream) throws IOException;
+
+// 反序列化
+void deserialize(InputStream inputStream) throws IllegalPathException, IOException;
+```
+
+当前版本针对全序列索引和子序列索引,分别实现了一种可用区间管理器。
+
+**SubMatchIndexUsability**:针对子序列索引的可用区间管理器。
+
+子序列索引的“索引序列”为单一序列,不包含通配符。如果这条长序列上的某一子片段被更改,则将这一片段标记为“索引不可用”,剩余片段则仍为“索引可用”区间。
+
+当前的实现采用一个链表来标记索引不可用的片段。每个链表的节点为 `RangeNode` ,代表一段不可用的时间段。
+
+```java
+class RangeNode {
+ long start;
+ long end;
+ RangeNode next;
+}
+```
+
+`SubMatchIndexUsability` 的构造函数有两个:
+
+```java
+/**
+ * 构造函数
+ * @param maxSizeOfUsableSegments 不可用区间的最大段数
+ * @param initAllUsable 如果为true,最初时间段均为“索引可用”,如果为false,最初均为“索引不可用”
+ */
+SubMatchIndexUsability(int maxSizeOfUsableSegments, boolean initAllUsable) {
+ ...
+}
+```
+
+当前实现中对于 `RangeNode` 链表的访问和更新是线性遍历的,然而用户可以指定不可用区间的最大段数 $M$ ,因此链表的访问和更新复杂度可以控制在 `O(M)` ,即常数量级。当不可用区间的段数增加到上限时,会将新的不可用区间与较近的区间合并,从而控制区间段数。这样会使得“不可用区间”范围扩大,但将“可用区间”标记为“不可用区间”仅会让一些“被剪枝”的序列进入精化阶段,而不会造成漏解(即,将本来正确的结果错误地排除掉),因此 $M$ 参数并不会影响索引查询的正确性。
+
+`getUnusableRange()` 函数返回一系列不可用区间的时间过滤器:`List<Filter>` 。
+
+例如,在序列 `root.Wind.AZQ02.Speed` 上创建子序列索引。
+
+1. `initAllUsable = false,最初 整个区间段均为“索引不可用”;`maxSizeOfUsableSegments=2` 最多允许两个索引不可用片段存在;
+2. 增加可用片段 $[5,7]$ ,不可用区间变为 $[MIN,4] \cup [8,MAX]$ ;
+3. 增加可用片段 $[2,3]$ ,不可用区间应当分裂为 $[1,2] \cup [4,4]\cup [8,10]$ ,然而不可用区间超过了上限,因此停止分裂,仍然保持 $[MIN,4] \cup [8,MAX]$
+
+**WholeMatchIndexUsability**:针对全序列索引的可用区间管理器。
+
+全序列索引的“索引序列”包含通配符,每一条符合规则的时间序列作为一个整体插入索引。因此,对于一条序列的任何更改都会使整条序列变为“索引不可用”。因此,本类采用一个集合 `unusableSeriesSet` 来标记索引不可用的序列。
+
+例如,在索引序列 `root.Ery.*.Glu ` 之上创建全序列索引。当某条序列 `root.Ery.01.Glu` 被更改,则将其加入集合 `unusableSeriesSet` ,该条序列被标记为“索引不可用”。
+
+`getUnusableRange()` 函数返回被标记为“索引不可用”的序列的集合 `Set<PartialPath>` 。
+
+### 索引查询处理器
+
+查询处理模块负责执行索引相关的查询。包括索引查询执行器、查询优化器,在查询时也会用到可用区间管理器。
+
+**查询执行器**:对应类`QueryIndexExecutor` 。
+
+尽管我们可以从相似性索引的查询过程中提取共性,将其分为“剪枝阶段”和“精化阶段”,并且精化阶段可以由索引框架结果。然而,许多索引技术(包括当前实现的两种索引:RTree和ELB索引)会对“剪枝阶段”和“精化阶段”结合起来优化,强制接管“精化阶段”可能会影响索引查询效率。因此,现阶段的查询处理器直接调用`IndexManager#queryIndex` 函数。
+
+**查询结果类**:对应类 `IndexQueryDataSet` 继承自 `ListDataSet`。
+
+
+
+### 索引接口
+
+对应`IoTDBIndex` 。
+
+索引接口是索引实现者唯一需要实现的部分,接口包括指定特征提取器、写入索引、序列化索引、索引查询等等,详细阐述参见2.2.2.1节。
+
+## 流程设计
+
+### 索引写入流程
+
+![image](https://user-images.githubusercontent.com/5548915/109540629-b6602180-7afd-11eb-8dbe-e38c91921a7e.png)
+
+
+
+上图展现了索引机制的写入流程:
+
+* 当用户向数据库系统写入新的数据后,数据库系统通知索引总调度器 `IndexManger`。
+* 总调度器首先向注册表进行校验,判断数据所在的序列是否创建过索引。
+* 当校验成功后,总调度器将数据传递到相应的索引实例调度器,图中省略这一步骤。
+* 调度器首先使用特征提取器将新数据转换为特征,然后由索引实例消费掉。
+* 当数据写入索引实例后,更新可用区间管理器。至此,完成整个写入流程。
+
+在当前的实现中,仅当触发IoTDB的刷写操作时(`MemTableFlushTask`),索引机制才触发索引的批量写入操作。这样带来两个好处:首先,在批量写入的情况下,
+传感器将多个数据点积攒成序列,方便了特征提取器的处理;其次,考虑到工业物联网数据的乱序情况,IoTDB会将一段时间内的数据进行排序再传给索引机制,这样减少了乱序数据给索引带来的负面影响。
+
+延迟写入索引并不会影响索引查询的正确性。对于已经写入IoTDB但尚未触发刷写操作的数据,索引机制中的可用区间管理器会将其标记为"索引不可用",因此,这部分数据会直接进入精化阶段进行计算,不会被遗漏。
+
+### 索引查询流程
+
+![image](https://user-images.githubusercontent.com/5548915/109540634-b95b1200-7afd-11eb-800a-71b3b11ce16d.png)
+
+当用户发起索引查询时,索引机制为其创建单独的线程并执行查询。相似性索引的一般查询流程如下:
+
+1. 用户发起索引查询时,IoTDB会为其创建一个单独的线程并执行整个查询流程。这一执行线程调用索引机制并传入查询条件。索引机制首先调用注册表,判断该序列上是否创建过索引、索引是否支持这一查询条件。
+2. 校验成功后,`IndexManager` 根据查询条件中指定的查询目标,将执行线程和查询条件传递到相应的索引实例调度器`IndexProcessor` 。
+3. 执行线程调用特征提取器 `FeatureExtractor`,对查询序列提取特征。然后将查询特征及其他查询条件输入索引实例 `IoTDBIndex` 。
+ 1. 执行过滤阶段,并返回候选集,候选集中包含了一系列指向原始数据的指针。至此,查询处理模块的**过滤单元**完成。
+ 2. 在进入精化阶段前,执行线程首先调用可用区间管理器,将索引不可用区间中的序列加入候选集中,避免乱序数据造成索引错误剪枝。
+ 3. 然后调用查询优化器,对候选集的访问顺序进行重排序。
+ 4. 接下来,执行线程访问IoTDB,读取候选集所指向的原始数据并完成精确计算。
+ 5. 至此,查询处理模块的精化单元完成,返回结果。
+
+上述流程的第三步中,索引框架接管了精化阶段。但如果索引实例有更多的优化,也可以完全接管查询过程并直接返回结果。反映到模块调用上,即 `IndexProcessor` 直接调用 `IoTDBIndex#query` 函数,返回 `QueryDataSet` 。
+
+### 索引更新与删除
+
+索引更新主要涉及两个方面。首先,当IoTDB接收乱序数据后,索引可用区间管理器会相应更新,从而保证后续查询的正确性。其次,当IoTDB完成乱序数据整理和合并后,索引框架将对索引发起重建,这一步在未来实现。
+
+当索引被删除后,索引目录下的文件将被删除。
+
+
+
+## 文件结构
+
+![image](https://user-images.githubusercontent.com/5548915/109540601-a9433280-7afd-11eb-8bdb-3f77181c1637.png)
+
+上图左图是索引框架的文件组织结构。
+
+* 总文件夹index下包括数据文件夹和元数据文件夹;
+ * 元数据文件夹中目前仅包括注册表文件夹;
+ * 数据文件夹下,每个索引序列(对应一个 `IndexProcessor` )都对应一个文件夹;
+ * 索引序列文件夹会创建一个状态文件和可用区间管理器文件;
+ * 每个索引序列文件夹下可能创建了多个索引实例;
+
+上图右图是一个例子。
+
+
+
+## 并行性分析(TODO)
+
+## 复杂度分析(TODO)
+
+量化,时间复杂度,内存/磁盘空间复杂度。
+
+
+
+## 问题讨论
+
+### 索引读写并发能力-锁粒度
+
+为每一个索引实例分配一个读写锁,并由IndexProcessor控制索引的读写。
+
+由于原始数据是分批刷写磁盘的,因此索引也需要支持批量更新。这意味着索引不能在创建索引之前就看到全部原始数据。一些需要基于全局数据确定参数的索引的性能可能会受到影响(如VaFile和基于Kmeans的倒排索引)。
+
+目前为每个索引维护了读写锁(`org.apache.iotdb.db.index.IndexProcessor#indexLockMap`)。允许对索引的并发查询,但索引的写入和查询是互斥的。这一设计有两点值得讨论:
+
+1. 查询并发:并发查询提升了查询效率,但要求索引是查询并发安全的;
+2. 读写互斥:这意味着IoTDB刷写操作和索引查询之间会互相阻塞,无疑降低了效率。然而要取消这一限制,则要求索引是读写并发安全的。
+
+根据开发者对一些开源索引代码的调研,索引通常可以满足查询并发,但较难满足读写互斥,因此设计了读写锁机制。但是,值得讨论的是,是否对集成到IoTDB的索引代码提出更高的要求,从而提升IoTDB的效率呢?
+
+### 索引更新删除能力-可用区间
+
+TODO
+
+
+
+# 未来规划
+
+预计在第三次PR中提交的模块:
+
+* 特征提取器的实现 `IndexFeatureExtractor` :包括全序列检索和子序列检索中一些常见的特征提取方法,例如全序列检索用到的PAA特征、子序列检索中用到的两种滑动窗口模型等等;
+* 索引算法:基于以上特征提取器,实现了两种基本的检索算法:基于PAA特征的R树索引和等长特征块索引(ELB,回答子序列检索)
+
+留待未来实现的模块如下:
+
+* 统计模块IndexStats:几乎所有相似性索引(甚至可以推广到所有索引)都会关注一些共通的统计量,例如:CPU时间、磁盘访问时间、内存占用、磁盘占用、查询剪枝率、近似查询中的查准率和查全率等等。统计模块为其提供全面而标准的工具函数,用于索引性能评估和不同索引之间的性能比较;
+* 候选集顺序优化器 `IIndexCandidateOrderOptimize`:相似性索引剪枝后得到候选集列表。由于索引并未考虑IoTDB的数据组织方式,因此候选集的顺序可能不是最优的。候选集顺序优化器对此进行优化;
+* 其他索引技术
+
+
+
+# 测试验证(TODO)
+
+## 测试目标
+
+正确性测试或性能(对比)测试
+
+## 测试方案
+
+\- 测试系统
+
+\- 被测系统
+
+\- 测试环境
+
+\- 负载描述
+
+## 测试结果
+
+## 测试结论
\ No newline at end of file
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index eb527ae..476d547 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -657,6 +657,12 @@ default_index_window_range=10
# buffer parameter for index processor.
index_buffer_size=134217728
+# the max returned size of index query result
+max_index_query_result_size=5
+
+# the default value of max size of the index unusable segments, used in SubMatchIndexUsability
+default_max_size_of_unusable_segments=20
+
# whether enable data partition. If disabled, all data belongs to partition 0
enable_partition=false
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 030fde2..4e44ef8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.exception.LoadConfigurationException;
+import org.apache.iotdb.db.index.IndexProcessor;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -247,11 +248,11 @@ public class IoTDBConfig {
/**
* If we enable the memory-control mechanism during index building , {@code indexBufferSize}
* refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
- * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
- * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
- * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
- * will be flushed to the disk file. As a result, data in one series may be divided into more than
- * one part and indexed separately.
+ * one {@linkplain IndexProcessor IndexProcessor} share a total common buffer size. With the
+ * memory-control mechanism, the occupied memory of all raw data and index structures will be
+ * counted. If the memory buffer size reaches this threshold, the indexes will be flushed to the
+ * disk file. As a result, data in one series may be divided into more than one part and indexed
+ * separately.
*/
private long indexBufferSize = 128 * 1024 * 1024L;
@@ -261,6 +262,10 @@ public class IoTDBConfig {
*/
private int defaultIndexWindowRange = 10;
+ /**
+ * the default value of max size of the index unusable segments, used in SubMatchIndexUsability
+ */
+ private int defaultMaxSizeOfUnusableSegments = 20;
/** index directory. */
private String indexRootFolder = "data" + File.separator + "index";
@@ -637,6 +642,8 @@ public class IoTDBConfig {
/** the number of virtual storage groups per user-defined storage group */
private int virtualStorageGroupNum = 1;
+ private int maxIndexQueryResultSize = 5;
+
public IoTDBConfig() {
// empty constructor
}
@@ -2035,6 +2042,22 @@ public class IoTDBConfig {
this.defaultIndexWindowRange = defaultIndexWindowRange;
}
+ public int getMaxIndexQueryResultSize() {
+ return this.maxIndexQueryResultSize;
+ }
+
+ public void setMaxIndexQueryResultSize(int maxIndexQueryResultSize) {
+ this.maxIndexQueryResultSize = maxIndexQueryResultSize;
+ }
+
+ public int getDefaultMaxSizeOfUnusableSegments() {
+ return defaultMaxSizeOfUnusableSegments;
+ }
+
+ public void setDefaultMaxSizeOfUnusableSegments(int defaultSizeOfUsableSegments) {
+ this.defaultMaxSizeOfUnusableSegments = defaultSizeOfUsableSegments;
+ }
+
public int getVirtualStorageGroupNum() {
return virtualStorageGroupNum;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ea507f9..f1a86b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -436,6 +436,18 @@ public class IoTDBDescriptor {
conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
}
+ conf.setMaxIndexQueryResultSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_index_query_result_size",
+ Integer.toString(conf.getMaxIndexQueryResultSize()))));
+
+ conf.setDefaultMaxSizeOfUnusableSegments(
+ Integer.parseInt(
+ properties.getProperty(
+ "default_max_size_of_unusable_segments",
+ Integer.toString(conf.getDefaultMaxSizeOfUnusableSegments()))));
+
conf.setmManagerCacheSize(
Integer.parseInt(
properties
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..21b0e3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.IndexMemTableFlushTask;
+import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -58,7 +61,9 @@ public class MemTableFlushTask {
? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
: new LinkedBlockingQueue<>();
+ private final boolean enabledIndex;
private String storageGroup;
+ private final boolean sequence;
private IMemTable memTable;
@@ -71,7 +76,7 @@ public class MemTableFlushTask {
* @param storageGroup current storage group
*/
public MemTableFlushTask(
- IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
+ IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup, boolean sequence) {
this.memTable = memTable;
this.writer = writer;
this.storageGroup = storageGroup;
@@ -81,6 +86,8 @@ public class MemTableFlushTask {
"flush task of Storage group {} memtable is created, flushing to file {}.",
storageGroup,
writer.getFile().getName());
+ this.enabledIndex = IoTDBDescriptor.getInstance().getConfig().isEnableIndex();
+ this.sequence = sequence;
}
/** the function for flushing memtable. */
@@ -100,6 +107,14 @@ public class MemTableFlushTask {
}
long start = System.currentTimeMillis();
long sortTime = 0;
+ IndexMemTableFlushTask indexFlushTask = null;
+ if (enabledIndex) {
+ try {
+ indexFlushTask = IndexManager.getInstance().getIndexMemFlushTask(storageGroup, sequence);
+ } catch (Exception e) {
+ LOGGER.error("meet Exception in getIndexMemFlushTask, not affect the memtable flushing", e);
+ }
+ }
// for map do not use get(key) to iteratate
for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
@@ -114,6 +129,16 @@ public class MemTableFlushTask {
TVList tvList = series.getSortedTVListForFlush();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.put(new Pair<>(tvList, desc));
+ if (enabledIndex && indexFlushTask != null) {
+ try {
+ String deviceId = memTableEntry.getKey();
+ String measurementId = iWritableMemChunkEntry.getKey();
+ indexFlushTask.buildIndexForOneSeries(new PartialPath(deviceId, measurementId), tvList);
+ } catch (Exception e) {
+ LOGGER.error(
+ "meet Exception in buildIndexForOneSeries, not affect the memtable flushing", e);
+ }
+ }
}
encodingTaskQueue.put(new EndChunkGroupIoTask());
@@ -133,6 +158,13 @@ public class MemTableFlushTask {
}
ioTaskFuture.get();
+ if (enabledIndex && indexFlushTask != null) {
+ try {
+ indexFlushTask.endFlush();
+ } catch (Exception e) {
+ LOGGER.error("meet Exception in endFlush, not affect the memtable flushing", e);
+ }
+ }
try {
writer.writePlanIndices();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..6a73393 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -775,7 +775,7 @@ public class TsFileProcessor {
try {
writer.mark();
MemTableFlushTask flushTask =
- new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
+ new MemTableFlushTask(memTableToFlush, writer, storageGroupName, sequence);
flushTask.syncFlushMemTable();
} catch (Exception e) {
if (writer == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
index 7b8b8cf..89bc2fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
@@ -18,6 +18,7 @@
package org.apache.iotdb.db.exception.index;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.rpc.TSStatusCode;
public class QueryIndexException extends QueryProcessException {
@@ -26,4 +27,8 @@ public class QueryIndexException extends QueryProcessException {
public QueryIndexException(String message, int errorCode) {
super(message, errorCode);
}
+
+ public QueryIndexException(String message) {
+ super(message, TSStatusCode.INDEX_QUERY_ERROR.getStatusCode());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
index 7b8b8cf..78ba38f 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
@@ -17,13 +17,13 @@
*/
package org.apache.iotdb.db.exception.index;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.rpc.TSStatusCode;
-public class QueryIndexException extends QueryProcessException {
+public class UnsupportedIndexFuncException extends QueryIndexException {
- private static final long serialVersionUID = 9019233783504576296L;
+ private static final long serialVersionUID = 4185676677334759039L;
- public QueryIndexException(String message, int errorCode) {
- super(message, errorCode);
+ public UnsupportedIndexFuncException(String message) {
+ super(message, TSStatusCode.UNSUPPORTED_INDEX_FUNC_ERROR.getStatusCode());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java
new file mode 100644
index 0000000..1104c99
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A pool to control the max size of concurrent threads for index insertion at the same time. */
+public class IndexBuildTaskPoolManager extends AbstractPoolManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildTaskPoolManager.class);
+
+ private IndexBuildTaskPoolManager() {
+ int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentIndexBuildThread();
+ pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.INDEX_SERVICE.getName());
+ }
+
+ public static IndexBuildTaskPoolManager getInstance() {
+ return IndexBuildTaskPoolManager.InstanceHolder.instance;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return LOGGER;
+ }
+
+ @Override
+ public String getName() {
+ return "index building task";
+ }
+
+ @Override
+ public void start() {
+ if (pool == null) {
+ int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentIndexBuildThread();
+ pool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.INDEX_SERVICE.getName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (pool != null) {
+ close();
+ pool = null;
+ }
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ // allowed to do nothing
+ }
+
+ private static IndexBuildTaskPoolManager instance = new IndexBuildTaskPoolManager();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
new file mode 100644
index 0000000..053596b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_DATA_DIR_NAME;
+import static org.apache.iotdb.db.index.common.IndexConstant.META_DIR_NAME;
+import static org.apache.iotdb.db.index.common.IndexConstant.ROUTER_DIR;
+
+/**
+ * IndexManager is the global manager of index framework, which will be called by IoTDB when index
+ * creation, deletion, query and insertion. IndexManager will pass the index operations to the
+ * corresponding IndexProcessor.
+ */
+public class IndexManager implements IndexManagerMBean, IService {
+
+ private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+ /**
+ * The index root directory. All index metadata files, index data files are stored in this
+ * directory.
+ */
+ private final String indexRootDirPath;
+
+ private final String indexMetaDirPath;
+ private final String indexRouterDir;
+ private final String indexDataDirPath;
+ private final IIndexRouter router;
+
+ /** A function interface to construct an index processor. */
+ private CreateIndexProcessorFunc createIndexProcessorFunc;
+
+ private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private IndexManager() {
+ indexRootDirPath = DirectoryManager.getInstance().getIndexRootFolder();
+ indexMetaDirPath = indexRootDirPath + File.separator + META_DIR_NAME;
+ indexRouterDir = indexMetaDirPath + File.separator + ROUTER_DIR;
+ indexDataDirPath = indexRootDirPath + File.separator + INDEX_DATA_DIR_NAME;
+ createIndexProcessorFunc =
+ (indexSeries, indexInfoMap) ->
+ new IndexProcessor(
+ indexSeries,
+ IndexUtils.removeIllegalStarInDir(indexDataDirPath + File.separator + indexSeries));
+ router = IIndexRouter.Factory.getIndexRouter(indexRouterDir);
+ }
+
+ /**
+ * Given an IndexSeries, return its feature file path (unused currently).
+ *
+ * @param path the path on which the index is created, e.g. Root.ery.*.Glu or Root.Wind.d1.Speed.
+ * @param indexType the type of index
+ * @return the feature directory path for this index.
+ */
+ private String getFeatureFileDirectory(PartialPath path, IndexType indexType) {
+ return IndexUtils.removeIllegalStarInDir(
+ indexDataDirPath + File.separator + path.getFullPath() + File.separator + indexType);
+ }
+
+ /**
+ * Given an IndexSeries, return its data file path (unused currently).
+ *
+ * @param path the path on which the index is created, e.g. Root.ery.*.Glu or Root.Wind.d1.Speed.
+ * @param indexType the type of index
+ * @return the feature directory path for this index.
+ */
+ private String getIndexDataDirectory(PartialPath path, IndexType indexType) {
+ return getFeatureFileDirectory(path, indexType);
+ }
+
+ /**
+ * Execute the index creation. Due to the complex mapping relationship between the time series and
+ * the index instances, we encapsulate the index metadata management into the router {@link
+ * IIndexRouter} for stability.
+ *
+ * @param indexSeriesList a singleton list up to now.
+ * @param indexInfo the index information.
+ */
+ public void createIndex(List<PartialPath> indexSeriesList, IndexInfo indexInfo)
+ throws MetadataException {
+ if (!indexSeriesList.isEmpty()) {
+ router.addIndexIntoRouter(indexSeriesList.get(0), indexInfo, createIndexProcessorFunc, true);
+ }
+ }
+
+ /**
+ * Execute the index deletion.
+ *
+ * @param indexSeriesList a singleton list up to now.
+ * @param indexType the index type to be dropped.
+ */
+ public void dropIndex(List<PartialPath> indexSeriesList, IndexType indexType)
+ throws MetadataException, IOException {
+ if (!indexSeriesList.isEmpty()) {
+ router.removeIndexFromRouter(indexSeriesList.get(0), indexType);
+ }
+ }
+
+ /**
+ * When the storage group flushes,we construct {@link IndexMemTableFlushTask} for index insertion。
+ *
+ * <p>So far, the index insertion is triggered only when Memtables flush. A storage group contains
+ * several series and each of these series may create several indexes. In other words, one storage
+ * group may correspond to several {@linkplain IndexProcessor}.
+ *
+ * <p>This method return a router to find all {@linkplain IndexProcessor} related to this storage
+ * group.
+ *
+ * @param storageGroupPath the path of the storage group
+ * @param sequence true if it's sequence data, otherwise it's unsequence data
+ * @return a router to find all {@linkplain IndexProcessor} related to this storage group, and
+ * other informations
+ * @see IndexMemTableFlushTask
+ */
+ public IndexMemTableFlushTask getIndexMemFlushTask(String storageGroupPath, boolean sequence) {
+ // StorageGroupPath may contain file separator, we put a temp patch here.
+ storageGroupPath = storageGroupPath.replace(File.separatorChar, '/');
+ String realStorageGroupPath = storageGroupPath.split("/")[0];
+ IIndexRouter sgRouter = router.getRouterByStorageGroup(realStorageGroupPath);
+ return new IndexMemTableFlushTask(sgRouter, sequence);
+ }
+
+ /**
+ * Index query.
+ *
+ * <p>The initial idea is that index instances only process the "pruning phase" to prune some
+ * negative items and return a candidate list, the framework finishes the rest (so-called
+ * "post-processing phase" or "refinement phase", to query the raw time series by the candidate
+ * list and then to verified which series in candidate list are real positive results).
+ *
+ * <p>The above design is common enough for all of similarity index methods. However, index
+ * technology has various optimizations, and enforcing the above strategy will affect the freedom
+ * of index integration. The two implemented indexes (ELB index and RTree index) have their own
+ * optimizations which combine the pruning phase and post-processing phase. Therefore, in current
+ * version, the query process is entirely passed to the index instance.
+ *
+ * @param paths the series to be queried.
+ * @param indexType the index type to be queried.
+ * @param queryProps the properties of this query.
+ * @param context the query context.
+ * @param alignedByTime whether aligned index result by timestamps.
+ * @return the index query result.
+ */
+ public QueryDataSet queryIndex(
+ List<PartialPath> paths,
+ IndexType indexType,
+ Map<String, Object> queryProps,
+ QueryContext context,
+ boolean alignedByTime)
+ throws QueryIndexException, StorageEngineException {
+ if (paths.size() != 1) {
+ throw new QueryIndexException("Index allows to query only one path");
+ }
+ PartialPath queryIndexSeries = paths.get(0);
+ IndexProcessorStruct indexProcessorStruct =
+ router.startQueryAndCheck(queryIndexSeries, indexType, context);
+ List<StorageGroupProcessor> list = indexProcessorStruct.addMergeLock();
+ try {
+ return indexProcessorStruct.processor.query(indexType, queryProps, context, alignedByTime);
+ } finally {
+ StorageEngine.getInstance().mergeUnLock(list);
+ router.endQuery(indexProcessorStruct.processor.getIndexSeries(), indexType, context);
+ }
+ }
+
+ private void prepareDirectory() {
+ File rootDir = IndexUtils.getIndexFile(indexRootDirPath);
+ if (!rootDir.exists()) {
+ rootDir.mkdirs();
+ }
+ File routerDir = IndexUtils.getIndexFile(indexRouterDir);
+ if (!routerDir.exists()) {
+ routerDir.mkdirs();
+ }
+ File metaDir = IndexUtils.getIndexFile(indexMetaDirPath);
+ if (!metaDir.exists()) {
+ metaDir.mkdirs();
+ }
+ File dataDir = IndexUtils.getIndexFile(indexDataDirPath);
+ if (!dataDir.exists()) {
+ dataDir.mkdirs();
+ }
+ }
+
+ private void deleteDroppedIndexData() throws IOException, IllegalPathException {
+ for (File processorDataDir :
+ Objects.requireNonNull(IndexUtils.getIndexFile(indexDataDirPath).listFiles())) {
+ String processorName = processorDataDir.getName();
+ Map<IndexType, IndexInfo> infos =
+ router.getIndexInfosByIndexSeries(new PartialPath(processorName));
+ if (infos.isEmpty()) {
+ FileUtils.deleteDirectory(processorDataDir);
+ } else {
+ for (File indexDataDir : Objects.requireNonNull(processorDataDir.listFiles())) {
+ if (indexDataDir.isDirectory()
+ && !infos.containsKey(IndexType.valueOf(indexDataDir.getName()))) {
+ FileUtils.deleteDirectory(indexDataDir);
+ }
+ }
+ }
+ }
+ }
+
+ /** close the index manager. */
+ private synchronized void close() {
+ router.serialize(true);
+ }
+
+ @Override
+ public void start() throws StartupException {
+ if (!config.isEnableIndex()) {
+ return;
+ }
+ IndexBuildTaskPoolManager.getInstance().start();
+ try {
+ JMXService.registerMBean(this, ServiceType.INDEX_SERVICE.getJmxName());
+ prepareDirectory();
+ router.deserializeAndReload(createIndexProcessorFunc);
+ deleteDroppedIndexData();
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+ }
+
+ /**
+ * As IoTDB has no normal shutdown mechanism, this function will not be called. To ensure the
+ * information safety, The router needs to serialize index metadata every time createIndex or
+ * dropIndex is called.
+ */
+ @Override
+ public void stop() {
+ if (!config.isEnableIndex()) {
+ return;
+ }
+ close();
+ IndexBuildTaskPoolManager.getInstance().stop();
+ JMXService.deregisterMBean(ServiceType.INDEX_SERVICE.getJmxName());
+ }
+
+ public static IndexManager getInstance() {
+ return InstanceHolder.instance;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.INDEX_SERVICE;
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {}
+
+ private static IndexManager instance = new IndexManager();
+ }
+
+ @TestOnly
+ public synchronized void deleteAll() throws IOException {
+ logger.info("Start deleting all storage groups' timeseries");
+ close();
+
+ File indexMetaDir = IndexUtils.getIndexFile(this.indexMetaDirPath);
+ if (indexMetaDir.exists()) {
+ FileUtils.deleteDirectory(indexMetaDir);
+ }
+
+ File indexDataDir = IndexUtils.getIndexFile(this.indexDataDirPath);
+ if (indexDataDir.exists()) {
+ FileUtils.deleteDirectory(indexDataDir);
+ }
+ File indexRootDir =
+ IndexUtils.getIndexFile(DirectoryManager.getInstance().getIndexRootFolder());
+ if (indexRootDir.exists()) {
+ FileUtils.deleteDirectory(indexRootDir);
+ }
+ }
+
+ @TestOnly
+ public IIndexRouter getRouter() {
+ return router;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java
new file mode 100644
index 0000000..95cb0a9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+public interface IndexManagerMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java
new file mode 100644
index 0000000..dea0b1a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+/**
+ * IndexMemTableFlushTask is responsible for the index insertion when a TsFileProcessor flushes.
+ * IoTDB creates a MemTableFlushTask when a memtable is flushed to disk. After MemTableFlushTask
+ * sorts a series, it will be passed to a IndexMemTableFlushTask. The IndexMemTableFlushTask will
+ * detect whether one or more indexes have been created on this series, and pass its data to
+ * corresponding IndexProcessors and insert it into the corresponding indexes.
+ *
+ * <p>IndexMemTableFlushTask may cover more than one index processor.
+ */
+public class IndexMemTableFlushTask {
+
+ private final IIndexRouter router;
+ private final boolean sequence;
+
+ /** it should be immutable. */
+ IndexMemTableFlushTask(IIndexRouter router, boolean sequence) {
+ // check all processors
+ this.router = router;
+ this.sequence = sequence;
+ // in current version, we don't build index for unsequence block
+ if (sequence) {
+ for (IndexProcessorStruct p : router.getAllIndexProcessorsAndInfo()) {
+ p.processor.startFlushMemTable();
+ }
+ }
+ }
+
+ /**
+ * insert sorted time series
+ *
+ * @param path the path of time series
+ * @param tvList the sorted data
+ */
+ public void buildIndexForOneSeries(PartialPath path, TVList tvList) {
+ // in current version, we don't build index for unsequence block, but only update the index
+ // usability range.
+ if (sequence) {
+ router.getIndexProcessorByPath(path).forEach(p -> p.buildIndexForOneSeries(path, tvList));
+ } else {
+ router.getIndexProcessorByPath(path).forEach(p -> p.updateUnsequenceData(path, tvList));
+ }
+ }
+
+ /** wait for all IndexProcessors to finish building indexes. */
+ public void endFlush() {
+ if (sequence) {
+ router.getAllIndexProcessorsAndInfo().forEach(p -> p.processor.endFlushMemTable());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
new file mode 100644
index 0000000..95cf721
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
+import org.apache.iotdb.db.exception.index.IndexRuntimeException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.algorithm.IoTDBIndex;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.common.func.IndexNaiveFunc;
+import org.apache.iotdb.db.index.feature.IndexFeatureExtractor;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Each {@code IndexProcessor} manages all index instances under an <b>IndexSeries</b>.
+ *
+ * <p>An <b>IndexSeries</b> is one series or a set of series that an index instance acts on.
+ *
+ * <ul>
+ * <li>For whole matching, the IndexSeries is a set of series represented by a path with wildcard
+ * characters. For example, creating index on IndexSeries "root.steel.*.temperature" means all
+ * series belong to this IndexSeries will be inserted, such as root.steel.s1.temperature,
+ * root. steel.s2.temperature, ...
+ * <li>For subsequence matching, the IndexSeries is a full path. For example, creating index on
+ * IndexSeries "root.wind.azq01.speed" means all subsequence of this series will be inserted.
+ * </ul>
+ *
+ * In current design。users are allowed to create more than one type of index on an IndexSeries. e.g.
+ * we can create RTree and DS-Tree on "root.steel.*.temperature". New coming data will be inserted
+ * into both of these two indexes.
+ */
+public class IndexProcessor implements Comparable<IndexProcessor> {
+
+ private static final Logger logger = LoggerFactory.getLogger(IndexProcessor.class);
+
+ private PartialPath indexSeries;
+ private final String indexSeriesDirPath;
+ private TSDataType tsDataType;
+ private final IndexBuildTaskPoolManager indexBuildPoolManager;
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private Map<IndexType, ReadWriteLock> indexLockMap;
+
+ /**
+ * How many indexes of this IndexProcessor are currently inserting data. If 0, no indexes are
+ * inserting, that is, this IndexProcessor is not in flushing state.
+ */
+ private AtomicInteger numIndexBuildTasks;
+
+ /** Whether the processor has been closed */
+ private volatile boolean closed;
+
+ /** The map of index instances. */
+ private Map<IndexType, IoTDBIndex> allPathsIndexMap;
+
+ /** Some status data saved when index is closed for next open. */
+ private final Map<IndexType, ByteBuffer> previousDataBufferMap;
+
+ private final String previousDataBufferFile;
+
+ /**
+ * Each index instance corresponds to an {@linkplain IIndexUsable}. All IIndexUsable of a
+ * IndexProcessor are stored in one file.
+ */
+ private Map<IndexType, IIndexUsable> usableMap;
+
+ private final String usableFile;
+
+ /** The optimizer for the post-processing phase (refinement phase). Unused yet. */
+ private final IIndexCandidateOrderOptimize refinePhaseOptimizer;
+
+ public IndexProcessor(PartialPath indexSeries, String indexSeriesDirPath) {
+ this.indexBuildPoolManager = IndexBuildTaskPoolManager.getInstance();
+
+ this.numIndexBuildTasks = new AtomicInteger(0);
+ this.indexSeries = indexSeries;
+ this.indexSeriesDirPath = indexSeriesDirPath;
+ File dir = IndexUtils.getIndexFile(indexSeriesDirPath);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ this.closed = false;
+ this.allPathsIndexMap = new EnumMap<>(IndexType.class);
+ this.previousDataBufferMap = new EnumMap<>(IndexType.class);
+ this.indexLockMap = new EnumMap<>(IndexType.class);
+ this.usableMap = new EnumMap<>(IndexType.class);
+ this.previousDataBufferFile = indexSeriesDirPath + File.separator + "previousBuffer";
+ this.usableFile = indexSeriesDirPath + File.separator + "usableMap";
+ this.tsDataType = initSeriesType();
+ this.refinePhaseOptimizer = IIndexCandidateOrderOptimize.Factory.getOptimize();
+ deserializePreviousBuffer();
+ deserializeUsable(indexSeries);
+ }
+
+ /**
+ * Determines the data type of the index instances. All time series covered by this IndexProcessor
+ * should have the same data type.
+ *
+ * @return tsDataType of this IndexProcessor
+ */
+ private TSDataType initSeriesType() {
+ try {
+ if (indexSeries.isFullPath()) {
+ return MManager.getInstance().getSeriesType(indexSeries);
+ } else {
+ List<PartialPath> list =
+ IoTDB.metaManager.getAllTimeseriesPathWithAlias(indexSeries, 1, 0).left;
+ if (list.isEmpty()) {
+ throw new IndexRuntimeException("No series in the wildcard path");
+ } else {
+ return MManager.getInstance().getSeriesType(list.get(0));
+ }
+ }
+ } catch (MetadataException e) {
+ throw new IndexRuntimeException("get type failed. ", e);
+ }
+ }
+
+ private String getIndexDir(IndexType indexType) {
+ return indexSeriesDirPath + File.separator + indexType;
+ }
+
+ private void serializeUsable() {
+ File file = SystemFileFactory.INSTANCE.getFile(usableFile);
+ try (OutputStream outputStream = new FileOutputStream(file)) {
+ ReadWriteIOUtils.write(usableMap.size(), outputStream);
+ for (Entry<IndexType, IIndexUsable> entry : usableMap.entrySet()) {
+ IndexType indexType = entry.getKey();
+ ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+ IIndexUsable v = entry.getValue();
+ v.serialize(outputStream);
+ }
+ } catch (IOException e) {
+ logger.error("Error when serialize usability. Given up.", e);
+ }
+ }
+
+ private void serializePreviousBuffer() {
+ File file = SystemFileFactory.INSTANCE.getFile(previousDataBufferFile);
+ try (OutputStream outputStream = new FileOutputStream(file)) {
+ ReadWriteIOUtils.write(previousDataBufferMap.size(), outputStream);
+ for (Entry<IndexType, ByteBuffer> entry : previousDataBufferMap.entrySet()) {
+ IndexType indexType = entry.getKey();
+ ByteBuffer buffer = entry.getValue();
+ ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+ ReadWriteIOUtils.write(buffer, outputStream);
+ }
+ } catch (IOException e) {
+ logger.error("Error when serialize previous buffer. Given up.", e);
+ }
+ }
+
+ private void deserializePreviousBuffer() {
+ File file = SystemFileFactory.INSTANCE.getFile(previousDataBufferFile);
+ if (!file.exists()) {
+ return;
+ }
+ try (InputStream inputStream = new FileInputStream(file)) {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ IndexType indexType = IndexType.deserialize(ReadWriteIOUtils.readShort(inputStream));
+ ByteBuffer byteBuffer =
+ ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(inputStream);
+ previousDataBufferMap.put(indexType, byteBuffer);
+ }
+ } catch (IOException e) {
+ logger.error("Error when deserialize previous buffer. Given up.", e);
+ }
+ }
+
+ private void deserializeUsable(PartialPath indexSeries) {
+ File file = SystemFileFactory.INSTANCE.getFile(usableFile);
+ if (!file.exists()) {
+ return;
+ }
+ try (InputStream inputStream = new FileInputStream(file)) {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ short indexTypeShort = ReadWriteIOUtils.readShort(inputStream);
+ IndexType indexType = IndexType.deserialize(indexTypeShort);
+ IIndexUsable usable =
+ IIndexUsable.Factory.deserializeIndexUsability(indexSeries, inputStream);
+ usableMap.put(indexType, usable);
+ }
+ } catch (IOException | IllegalPathException e) {
+ logger.error("Error when deserialize usability. Given up.", e);
+ }
+ }
+
+ /** Flush all index instances to disk except NoIndex. */
+ @SuppressWarnings("squid:S2589")
+ public synchronized void close(boolean deleteFiles) throws IOException {
+ if (closed) {
+ return;
+ }
+ waitingFlushEndAndDo(
+ () -> {
+ lock.writeLock().lock();
+ try {
+ // store Preprocessor
+ for (Entry<IndexType, IoTDBIndex> entry : allPathsIndexMap.entrySet()) {
+ IndexType indexType = entry.getKey();
+ if (indexType == IndexType.NO_INDEX) {
+ continue;
+ }
+ IoTDBIndex index = entry.getValue();
+ previousDataBufferMap.put(entry.getKey(), index.closeAndRelease());
+ }
+ logger.info("close and release index processor: {}", indexSeries);
+ allPathsIndexMap.clear();
+ serializeUsable();
+ serializePreviousBuffer();
+ closed = true;
+ if (deleteFiles) {
+ File dir = IndexUtils.getIndexFile(indexSeriesDirPath);
+ dir.delete();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ });
+ }
+
+ private void waitingFlushEndAndDo(IndexNaiveFunc indexNaiveAction) throws IOException {
+ // wait the flushing end.
+ long waitingTime;
+ long waitingInterval = 100;
+ long st = System.currentTimeMillis();
+ while (true) {
+ if (isFlushing()) {
+ try {
+ Thread.sleep(waitingInterval);
+ } catch (InterruptedException e) {
+ logger.error("interrupted, index insert may not complete.", e);
+ return;
+ }
+ waitingTime = System.currentTimeMillis() - st;
+ // wait for too long time.
+ if (waitingTime > 3000) {
+ waitingInterval = 1000;
+ if (logger.isWarnEnabled()) {
+ logger.warn(
+ String.format(
+ "IndexFileProcessor %s: wait-close time %d ms is too long.",
+ indexSeries, waitingTime));
+ }
+ }
+ } else {
+ indexNaiveAction.act();
+ break;
+ }
+ }
+ }
+
+ public PartialPath getIndexSeries() {
+ return indexSeries;
+ }
+
+ @Override
+ public int hashCode() {
+ return indexSeries.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ return compareTo((IndexProcessor) obj) == 0;
+ }
+
+ @Override
+ public String toString() {
+ return indexSeries + ": " + allPathsIndexMap;
+ }
+
+ @Override
+ public int compareTo(IndexProcessor o) {
+ return indexSeries.compareTo(o.indexSeries);
+ }
+
+ private boolean isFlushing() {
+ return numIndexBuildTasks.get() > 0;
+ }
+
+ void startFlushMemTable() {
+ lock.writeLock().lock();
+ try {
+ if (closed) {
+ throw new IndexRuntimeException("closed index file !!!!!");
+ }
+ if (isFlushing()) {
+ throw new IndexRuntimeException("There has been a flushing, do you want to wait?");
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Insert sorted series into index instances.
+ *
+ * @param path the path of time series
+ * @param tvList the sorted series
+ */
+ void buildIndexForOneSeries(PartialPath path, TVList tvList) {
+ // for every index of this path, submit a task to pool.
+ lock.writeLock().lock();
+ numIndexBuildTasks.incrementAndGet();
+ try {
+ if (tvList.getDataType() != tsDataType) {
+ logger.warn(
+ "TsDataType unmatched, ignore: indexSeries {}: {}, given series {}: {}",
+ indexSeries,
+ tsDataType,
+ path,
+ tvList.getDataType());
+ }
+ allPathsIndexMap.forEach(
+ (indexType, index) -> {
+ // NO_INDEX doesn't involve the phase of building index
+ if (indexType == IndexType.NO_INDEX) {
+ numIndexBuildTasks.decrementAndGet();
+ return;
+ }
+ Runnable buildTask =
+ () -> {
+ try {
+ indexLockMap.get(indexType).writeLock().lock();
+ IndexFeatureExtractor extractor = index.startFlushTask(path, tvList);
+ while (extractor.hasNext()) {
+ extractor.processNext();
+ index.buildNext();
+ }
+ index.endFlushTask();
+ } catch (IndexManagerException e) {
+ // Give up the following data, but the previously serialized chunk will not be
+ // affected.
+ logger.error("build index failed", e);
+ } catch (RuntimeException e) {
+ logger.error("RuntimeException", e);
+ } finally {
+ numIndexBuildTasks.decrementAndGet();
+ indexLockMap.get(indexType).writeLock().unlock();
+ }
+ };
+ indexBuildPoolManager.submit(buildTask);
+ });
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** Not return until all index insertion have finished. */
+ void endFlushMemTable() {
+ // wait until all flushing tasks end.
+ try {
+ waitingFlushEndAndDo(() -> {});
+ } catch (IOException ignored) {
+ // the exception is ignored
+ }
+ }
+
+ /**
+ * According to the passed-in {@code indexInfoMap}, refresh the index instances in the current
+ * IndexProcessor. When someone index is created or dropped, the IndexProcessor is out of date, so
+ * it needs to be refreshed.
+ *
+ * @param indexInfoMap passed from {@link IIndexRouter}
+ */
+ public void refreshSeriesIndexMapFromMManager(Map<IndexType, IndexInfo> indexInfoMap) {
+ lock.writeLock().lock();
+ try {
+ // Add indexes that are not in the previous map
+ for (Entry<IndexType, IndexInfo> entry : indexInfoMap.entrySet()) {
+ IndexType indexType = entry.getKey();
+ IndexInfo indexInfo = entry.getValue();
+ if (!allPathsIndexMap.containsKey(indexType)) {
+ IoTDBIndex index =
+ IndexType.constructIndex(
+ indexSeries,
+ tsDataType,
+ getIndexDir(indexType),
+ indexType,
+ indexInfo,
+ previousDataBufferMap.get(indexType));
+ allPathsIndexMap.putIfAbsent(indexType, index);
+ indexLockMap.putIfAbsent(indexType, new ReentrantReadWriteLock());
+ usableMap.putIfAbsent(
+ indexType, IIndexUsable.Factory.createEmptyIndexUsability(indexSeries));
+ }
+ }
+
+ // remove indexes that are removed from the previous map
+ for (IndexType indexType : new ArrayList<>(allPathsIndexMap.keySet())) {
+ if (!indexInfoMap.containsKey(indexType)) {
+ try {
+ allPathsIndexMap.get(indexType).closeAndRelease();
+ } catch (IOException e) {
+ logger.warn(
+ "Meet error when close {} before removing index, {}", indexType, e.getMessage());
+ }
+ // remove index file directories
+ File dir = IndexUtils.getIndexFile(getIndexDir(indexType));
+ dir.delete();
+ allPathsIndexMap.remove(indexType);
+ usableMap.remove(indexType);
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** For unsequence data, mark them as "index-unusable" in corresponding IIndexUsable */
+ void updateUnsequenceData(PartialPath path, TVList tvList) {
+ this.usableMap.forEach(
+ (indexType, usable) ->
+ usable.minusUsableRange(path, tvList.getMinTime(), tvList.getLastTime()));
+ }
+
+ /**
+ * index query.
+ *
+ * @param indexType the type of index to be queried
+ * @param queryProps the query conditions
+ * @param context the query context
+ * @param alignedByTime true if result series need to be aligned by the timestamp.
+ * @return index query result
+ */
+ public QueryDataSet query(
+ IndexType indexType,
+ Map<String, Object> queryProps,
+ QueryContext context,
+ boolean alignedByTime)
+ throws QueryIndexException {
+ try {
+ lock.readLock().lock();
+ try {
+ if (!indexLockMap.containsKey(indexType)) {
+ throw new QueryIndexException(
+ String.format(
+ "%s hasn't been built on %s", indexType.toString(), indexSeries.getFullPath()));
+ } else {
+ indexLockMap.get(indexType).readLock().lock();
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ IoTDBIndex index = allPathsIndexMap.get(indexType);
+ return index.query(
+ queryProps, this.usableMap.get(indexType), context, refinePhaseOptimizer, alignedByTime);
+
+ } finally {
+ indexLockMap.get(indexType).readLock().unlock();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java b/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java
new file mode 100644
index 0000000..984e4ac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.algorithm;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.index.common.DistSeries;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.feature.IndexFeatureExtractor;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * For index developers, the indexing framework aims to provide a simple and friendly platform and
+ * shield the complex details in other modules.
+ *
+ * <p>To add a new index methods, developers need inherit {@linkplain IoTDBIndex} or its subclass.
+ */
+public abstract class IoTDBIndex {
+
+ protected final PartialPath indexSeries;
+ protected final IndexType indexType;
+ protected final TSDataType tsDataType;
+
+ protected final Map<String, String> props;
+ protected IndexFeatureExtractor indexFeatureExtractor;
+
+ public IoTDBIndex(PartialPath indexSeries, TSDataType tsDataType, IndexInfo indexInfo) {
+ this.indexSeries = indexSeries;
+ this.indexType = indexInfo.getIndexType();
+ this.props = indexInfo.getProps();
+ this.tsDataType = tsDataType;
+ }
+
+ /**
+ * An index should determine which FeatureExtractor it uses and hook it to {@linkplain *
+ * IoTDBIndex}.indexFeatureExtractor. This method is called when IoTDBIndex is created.
+ *
+ * @param previous the status data saved in the last closing of the FeatureExtractor
+ * @param inQueryMode true if it's during index query, false if it's during index building
+ */
+ public abstract void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode);
+
+ /** A new item has been pre-processed by the FeatureExtractor, now the index can insert it. */
+ public abstract boolean buildNext() throws IndexManagerException;
+
+ /** This index will be closed, it's time to serialize in-memory data to disk for next open. */
+ protected abstract void flushIndex();
+
+ /**
+ * execute index query and return the result.
+ *
+ * @param queryProps query conditions
+ * @param iIndexUsable the information of index usability
+ * @param context query context provided by IoTDB.
+ * @param candidateOrderOptimize an optimizer for the order of visiting candidates
+ * @param alignedByTime true if the result series need to aligned by timestamp, otherwise they
+ * will be aligned by their first points
+ * @return the result should be consistent with other IoTDB query result.
+ */
+ public abstract QueryDataSet query(
+ Map<String, Object> queryProps,
+ IIndexUsable iIndexUsable,
+ QueryContext context,
+ IIndexCandidateOrderOptimize candidateOrderOptimize,
+ boolean alignedByTime)
+ throws QueryIndexException;
+
+ /**
+ * In current design, the index building (insert data) only occurs in the memtable flush. When
+ * this method is called, a batch of raw data is coming.
+ *
+ * @param tvList tvList to insert
+ * @return FeatureExtractor filled with the given raw data
+ */
+ public IndexFeatureExtractor startFlushTask(PartialPath partialPath, TVList tvList) {
+ this.indexFeatureExtractor.appendNewSrcData(tvList);
+ return indexFeatureExtractor;
+ }
+
+ /** The flush task has ended. */
+ public void endFlushTask() {
+ indexFeatureExtractor.clearProcessedSrcData();
+ }
+
+ /** Close the index, release resources of the index structure and the feature extractor. */
+ public ByteBuffer closeAndRelease() throws IOException {
+ flushIndex();
+ if (indexFeatureExtractor != null) {
+ return indexFeatureExtractor.closeAndRelease();
+ } else {
+ return ByteBuffer.allocate(0);
+ }
+ }
+
+ public TSDataType getTsDataType() {
+ return tsDataType;
+ }
+
+ public IndexType getIndexType() {
+ return indexType;
+ }
+
+ @Override
+ public String toString() {
+ return indexType.toString();
+ }
+
+ protected QueryDataSet constructSearchDataset(List<DistSeries> res, boolean alignedByTime)
+ throws QueryIndexException {
+ return constructSearchDataset(
+ res, alignedByTime, IoTDBDescriptor.getInstance().getConfig().getMaxIndexQueryResultSize());
+ }
+
+ protected QueryDataSet constructSearchDataset(
+ List<DistSeries> res, boolean alignedByTime, int nMaxReturnSeries)
+ throws QueryIndexException {
+ if (alignedByTime) {
+ throw new QueryIndexException("Unsupported alignedByTime result");
+ }
+ // make result paths and types
+ nMaxReturnSeries = Math.min(nMaxReturnSeries, res.size());
+ List<PartialPath> paths = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ Map<String, Integer> pathToIndex = new HashMap<>();
+ int nMinLength = Integer.MAX_VALUE;
+ for (int i = 0; i < nMaxReturnSeries; i++) {
+ PartialPath series = res.get(i).partialPath;
+ paths.add(series);
+ pathToIndex.put(series.getFullPath(), i);
+ types.add(tsDataType);
+ if (res.get(i).tvList.size() < nMinLength) {
+ nMinLength = res.get(i).tvList.size();
+ }
+ }
+ IndexQueryDataSet dataSet = new IndexQueryDataSet(paths, types, pathToIndex);
+ if (nMaxReturnSeries == 0) {
+ return dataSet;
+ }
+ for (int row = 0; row < nMinLength; row++) {
+ RowRecord record = new RowRecord(row);
+ for (int col = 0; col < nMaxReturnSeries; col++) {
+ TVList tvList = res.get(col).tvList;
+ record.addField(IndexUtils.getValue(tvList, row), tsDataType);
+ }
+ dataSet.putRecord(record);
+ }
+ return dataSet;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java b/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java
new file mode 100644
index 0000000..1e9ff88
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.algorithm;
+
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * NoIndex do nothing on feature extracting and data pruning. Its index-available range is always
+ * empty.
+ */
+public class NoIndex extends IoTDBIndex {
+
+ public NoIndex(PartialPath path, TSDataType tsDataType, IndexInfo indexInfo) {
+ super(path, tsDataType, indexInfo);
+ }
+
+ @Override
+ public void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode) {
+ // NoIndex does nothing
+ }
+
+ @Override
+ public boolean buildNext() {
+ return true;
+ }
+
+ @Override
+ protected void flushIndex() {
+ // NoIndex does nothing
+ }
+
+ @Override
+ public QueryDataSet query(
+ Map<String, Object> queryProps,
+ IIndexUsable iIndexUsable,
+ QueryContext context,
+ IIndexCandidateOrderOptimize candidateOrderOptimize,
+ boolean alignedByTime) {
+ return new IndexQueryDataSet(
+ Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java b/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java
new file mode 100644
index 0000000..aa50be1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+public class DistSeries {
+
+ public double dist;
+ public TVList tvList;
+ public PartialPath partialPath;
+
+ public DistSeries(double dist, TVList tvList, PartialPath partialPath) {
+ this.dist = dist;
+ this.tvList = tvList;
+ this.partialPath = partialPath;
+ }
+
+ public String toString() {
+ return "(" + partialPath + "," + dist + ":" + tvList + ")";
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
index d971aa7..3c0cd32 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
@@ -19,18 +19,72 @@ package org.apache.iotdb.db.index.common;
public class IndexConstant {
+ // SQL show
+ public static final String ID = "ID";
+ public static final String NON_IMPLEMENTED_MSG = "Not implemented yet";
+
+ public static final String ROUTER_DIR = "router";
+ public static final String META_DIR_NAME = "index_meta";
+ public static final String INDEX_DATA_DIR_NAME = "index_data";
+ public static final String STORAGE_GROUP_INDEXING_SUFFIX = ".sg_indexing";
+ public static final String STORAGE_GROUP_INDEXED_SUFFIX = ".sg_index";
+
+ public static final String INDEXING_SUFFIX = ".indexing";
+ public static final String INDEXED_SUFFIX = ".index";
+
// whole matching
public static final int NON_SET_TOP_K = -1;
public static final String TOP_K = "TOP_K";
+ // subsequence matching: sliding window
+ public static final String INDEX_WINDOW_RANGE = "INDEX_WINDOW_RANGE";
+ public static final String INDEX_RANGE_STRATEGY = "INDEX_RANGE_STRATEGY";
+ public static final String INDEX_SLIDE_STEP = "INDEX_SLIDE_STEP";
+
+ public static final String INDEX_MAGIC = "IoTDBIndex";
+ public static final String DEFAULT_PROP_NAME = "DEFAULT";
+
+ public static final int INDEX_MAP_INIT_RESERVE_SIZE = 5;
+
public static final String PATTERN = "PATTERN";
public static final String THRESHOLD = "THRESHOLD";
+ public static final String BORDER = "BORDER";
+
+ // MBR Index parameters
+ public static final String FEATURE_DIM = "FEATURE_DIM";
+ public static final String DEFAULT_FEATURE_DIM = "4";
+ public static final String SEED_PICKER = "SEED_PICKER";
+ public static final String MAX_ENTRIES = "MAX_ENTRIES";
+ public static final String MIN_ENTRIES = "MIN_ENTRIES";
// RTree PAA parameters
public static final String PAA_DIM = "PAA_DIM";
+ public static final String SERIES_LENGTH = "SERIES_LENGTH";
+ public static final String DEFAULT_SERIES_LENGTH = "16";
+ public static final String DEFAULT_RTREE_PAA_DISTANCE = "2";
+
+ // Distance
+ public static final String DISTANCE = "DISTANCE";
+ public static final String L_INFINITY = "L_INFINITY";
+ public static final String DEFAULT_DISTANCE = "2";
+
+ // ELB Type
+ public static final String ELB_TYPE = "ELB_TYPE";
+ public static final String ELB_TYPE_ELE = "ELE";
+ public static final String ELB_TYPE_SEQ = "SEQ";
+ public static final String DEFAULT_ELB_TYPE = "SEQ";
+ public static final int DEFAULT_BLOCK_SIZE = 20;
// ELB: calc param
public static final String BLOCK_SIZE = "BLOCK_SIZE";
+ public static final String ELB_CALC_PARAM = "ELB_CALC_PARAM";
+ public static final String DEFAULT_ELB_CALC_PARAM = "SINGLE";
+ public static final String ELB_CALC_PARAM_SINGLE = "SINGLE";
+ public static final String ELB_THRESHOLD_BASE = "ELB_THRESHOLD_BASE";
+ public static final String ELB_THRESHOLD_RATIO = "ELB_THRESHOLD_RATIO";
+ public static final double ELB_DEFAULT_THRESHOLD_RATIO = 0.1;
+
+ public static final String MISSING_PARAM_ERROR_MESSAGE = "missing parameter: %s";
private IndexConstant() {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java
new file mode 100644
index 0000000..e2e67ab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IndexInfo implements Cloneable {
+
+ private Map<String, String> props;
+ private long time;
+ private IndexType indexType;
+
+ public IndexInfo(IndexType indexType, long time, Map<String, String> props) {
+ this.props = props;
+ this.time = time;
+ this.indexType = indexType;
+ }
+
+ public Map<String, String> getProps() {
+ return props;
+ }
+
+ public void setProps(Map<String, String> props) {
+ this.props = props;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+
+ public void setIndexType(IndexType indexType) {
+ this.indexType = indexType;
+ }
+
+ public IndexType getIndexType() {
+ return indexType;
+ }
+
+ @Deprecated
+ public String serializeCreateIndex(String path) {
+ StringBuilder res = new StringBuilder();
+ res.append(
+ String.format(
+ "%s,%s,%s,%s", MetadataOperationType.CREATE_INDEX, path, indexType.serialize(), time));
+ if (props != null && !props.isEmpty()) {
+ for (Map.Entry entry : props.entrySet()) {
+ res.append(String.format(",%s=%s", entry.getKey(), entry.getValue()));
+ }
+ }
+ return res.toString();
+ }
+
+ /**
+ * @param args [0] is the MetadataType, [1] is the path, the rest is to be parsed.
+ * @return parsed IndexInfo
+ */
+ @Deprecated
+ public static IndexInfo deserializeCreateIndex(String[] args) {
+ IndexType indexType = IndexType.deserialize(Short.parseShort(args[2]));
+ long time = Long.parseLong(args[3]);
+ HashMap<String, String> indexProps = null;
+ if (args.length > 4) {
+ String[] kv;
+ indexProps = new HashMap<>(args.length - 4 + 1, 1);
+ for (int k = 4; k < args.length; k++) {
+ kv = args[k].split("=");
+ indexProps.put(kv[0], kv[1]);
+ }
+ }
+ return new IndexInfo(indexType, time, indexProps);
+ }
+
+ @Deprecated
+ public static String serializeDropIndex(String path, IndexType indexType) {
+ return String.format("%s,%s,%s", MetadataOperationType.DROP_INDEX, path, indexType.serialize());
+ }
+
+ /**
+ * @param args [0] is the MetadataType, [1] is the path, the rest is to be parsed.
+ * @return parsed IndexInfo
+ */
+ @Deprecated
+ public static IndexType deserializeDropIndex(String[] args) {
+ return IndexType.deserialize(Short.parseShort(args[2]));
+ }
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+ ReadWriteIOUtils.write(time, outputStream);
+ ReadWriteIOUtils.write(props, outputStream);
+ }
+
+ public static IndexInfo deserialize(InputStream inputStream) throws IOException {
+ short indexTypeShort = ReadWriteIOUtils.readShort(inputStream);
+ IndexType indexType = IndexType.deserialize(indexTypeShort);
+ long time = ReadWriteIOUtils.readLong(inputStream);
+ Map<String, String> indexProps = ReadWriteIOUtils.readMap(inputStream);
+ return new IndexInfo(indexType, time, indexProps);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[type: %s, time: %d, props: %s]", indexType, time, props);
+ }
+
+ @Override
+ public Object clone() {
+ return new IndexInfo(indexType, time, new HashMap<>(props));
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java
new file mode 100644
index 0000000..ae3a875
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class IndexProcessorStruct {
+
+ public IndexProcessor processor;
+ public PartialPath representativePath;
+ public Map<IndexType, IndexInfo> infos;
+
+ public IndexProcessorStruct(
+ IndexProcessor processor, PartialPath representativePath, Map<IndexType, IndexInfo> infos) {
+ this.processor = processor;
+ this.representativePath = representativePath;
+ this.infos = infos;
+ }
+
+ public List<StorageGroupProcessor> addMergeLock() throws StorageEngineException {
+ return StorageEngine.getInstance().mergeLock(Collections.singletonList(representativePath));
+ }
+
+ @Override
+ public String toString() {
+ return "<" + infos + "\n" + processor + ">";
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
index 177e7bd..ec94158 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
@@ -19,13 +19,20 @@
package org.apache.iotdb.db.index.common;
import org.apache.iotdb.db.exception.index.UnsupportedIndexTypeException;
+import org.apache.iotdb.db.index.algorithm.IoTDBIndex;
+import org.apache.iotdb.db.index.algorithm.NoIndex;
+import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
public enum IndexType {
NO_INDEX,
RTREE_PAA,
- ELB_INDEX,
- KV_INDEX;
+ ELB_INDEX;
/**
* judge the index type.
@@ -41,17 +48,11 @@ public enum IndexType {
return RTREE_PAA;
case 2:
return ELB_INDEX;
- case 3:
- return KV_INDEX;
default:
throw new NotImplementedException("Given index is not implemented");
}
}
- public static int getSerializedSize() {
- return Short.BYTES;
- }
-
/**
* judge the index deserialize type.
*
@@ -65,8 +66,6 @@ public enum IndexType {
return 1;
case ELB_INDEX:
return 2;
- case KV_INDEX:
- return 3;
default:
throw new NotImplementedException("Given index is not implemented");
}
@@ -81,4 +80,41 @@ public enum IndexType {
throw new UnsupportedIndexTypeException(indexTypeString);
}
}
+
+ private static IoTDBIndex newIndexByType(
+ PartialPath path,
+ TSDataType tsDataType,
+ String indexDir,
+ IndexType indexType,
+ IndexInfo indexInfo) {
+ switch (indexType) {
+ case NO_INDEX:
+ return new NoIndex(path, tsDataType, indexInfo);
+ case ELB_INDEX:
+ // return new ELBIndex(path, tsDataType, indexDir, indexInfo);
+ case RTREE_PAA:
+ // return new RTreePAAIndex(path, tsDataType, indexDir, indexInfo);
+ default:
+ throw new NotImplementedException("unsupported index type:" + indexType);
+ }
+ }
+
+ public static IoTDBIndex constructIndex(
+ PartialPath indexSeries,
+ TSDataType tsDataType,
+ String indexDir,
+ IndexType indexType,
+ IndexInfo indexInfo,
+ ByteBuffer previous) {
+ indexInfo.setProps(uppercaseStringProps(indexInfo.getProps()));
+ IoTDBIndex index = newIndexByType(indexSeries, tsDataType, indexDir, indexType, indexInfo);
+ index.initFeatureExtractor(previous, false);
+ return index;
+ }
+
+ private static Map<String, String> uppercaseStringProps(Map<String, String> props) {
+ Map<String, String> uppercase = new HashMap<>(props.size());
+ props.forEach((k, v) -> uppercase.put(k.toUpperCase(), v.toUpperCase()));
+ return uppercase;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
index 98809f0..f930cff 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
@@ -17,8 +17,40 @@
*/
package org.apache.iotdb.db.index.common;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
public class IndexUtils {
+ /**
+ * justify whether the fullPath matches the pathWithStar. The two paths must have the same node
+ * lengths. Refer to {@code org.apache.iotdb.db.metadata.MManager#match}
+ *
+ * @param pathWithStar a path with wildcard characters.
+ * @param fullPath a full path without wildcard characters.
+ * @return true if fullPath matches pathWithStar.
+ */
+ public static boolean match(PartialPath pathWithStar, PartialPath fullPath) {
+ String[] fullNodes = fullPath.getNodes();
+ String[] starNodes = pathWithStar.getNodes();
+ if (starNodes.length != fullNodes.length) {
+ return false;
+ }
+ for (int i = 0; i < fullNodes.length; i++) {
+ if (!"*".equals(starNodes[i]) && !fullNodes[i].equals(starNodes[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public static String removeQuotation(String v) {
int start = 0;
int end = v.length();
@@ -31,5 +63,48 @@ public class IndexUtils {
return v.substring(start, end);
}
+ public static File getIndexFile(String filePath) {
+ return SystemFileFactory.INSTANCE.getFile(filePath);
+ }
+
private IndexUtils() {}
+
+ public static Map<String, Object> toLowerCaseProps(Map<String, Object> props) {
+ Map<String, Object> uppercase = new HashMap<>(props.size());
+ for (Entry<String, Object> entry : props.entrySet()) {
+ String k = entry.getKey();
+ Object v = entry.getValue();
+ if (v instanceof String) {
+ uppercase.put(k.toUpperCase(), ((String) v).toUpperCase());
+ } else {
+ uppercase.put(k.toUpperCase(), v);
+ }
+ }
+ return uppercase;
+ }
+
+ public static Object getValue(TVList srcData, int idx) {
+ switch (srcData.getDataType()) {
+ case INT32:
+ return srcData.getInt(idx);
+ case INT64:
+ return srcData.getLong(idx);
+ case FLOAT:
+ return srcData.getFloat(idx);
+ case DOUBLE:
+ return (float) srcData.getDouble(idx);
+ default:
+ throw new NotImplementedException(srcData.getDataType().toString());
+ }
+ }
+
+ /**
+ * "*" is illegal in Windows directory path. Replace it with "#"
+ *
+ * @param previousDir path which may contains "*"
+ * @return path replacing "*" with "#"
+ */
+ public static String removeIllegalStarInDir(String previousDir) {
+ return previousDir.replace('*', '#');
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java b/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java
new file mode 100644
index 0000000..3ebca7c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common.func;
+
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.Map;
+
+/**
+ * Do something without input and output.
+ *
+ * <p>This is a <a href="package-summary.html">functional interface</a> whose functional method is
+ * {@link #act(PartialPath indexSeries, Map indexInfoMap)}.
+ */
+@FunctionalInterface
+public interface CreateIndexProcessorFunc {
+
+ /** Do something. */
+ IndexProcessor act(PartialPath indexSeries, Map<IndexType, IndexInfo> indexInfoMap);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java b/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java
new file mode 100644
index 0000000..6641f60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common.func;
+
+import java.io.IOException;
+
+/**
+ * Do something without input and output.
+ *
+ * <p>This is a <a href="package-summary.html">functional interface</a> whose functional method is
+ * {@link #act()}.
+ */
+@FunctionalInterface
+public interface IndexNaiveFunc {
+
+ /** Do something. */
+ void act() throws IOException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java
new file mode 100644
index 0000000..64baed2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.common.math;
+
+import org.apache.iotdb.db.index.common.math.probability.UniformProba;
+import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.Random;
+
+public class Randomwalk {
+
+ public static TVList generateRanWalkTVList(long length, long seed, float R, float miu) {
+ TVList res = TVListAllocator.getInstance().allocate(TSDataType.DOUBLE);
+ double lastPoint = R;
+ Random r = new Random(seed);
+ UniformProba uniform = new UniformProba(miu / 2, -miu / 2, r);
+ for (int i = 0; i < length; i++) {
+ lastPoint = lastPoint + uniform.getNextRandom();
+ res.putDouble(i, lastPoint);
+ }
+ return res;
+ }
+
+ public static TVList generateRanWalkTVList(long length) {
+ return generateRanWalkTVList(length, 0, 0, 1);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
index 7b8b8cf..044f915 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
@@ -15,15 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.exception.index;
+package org.apache.iotdb.db.index.common.math.probability;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+public abstract class Probability {
-public class QueryIndexException extends QueryProcessException {
-
- private static final long serialVersionUID = 9019233783504576296L;
-
- public QueryIndexException(String message, int errorCode) {
- super(message, errorCode);
- }
+ public abstract double getNextRandom();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
index 7b8b8cf..909d080 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
@@ -15,15 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.exception.index;
+package org.apache.iotdb.db.index.common.math.probability;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import java.util.Random;
-public class QueryIndexException extends QueryProcessException {
+public class UniformProba extends Probability {
- private static final long serialVersionUID = 9019233783504576296L;
+ private final double upBound;
+ private final double downBound;
+ private final double range;
+ private Random r;
- public QueryIndexException(String message, int errorCode) {
- super(message, errorCode);
+ public UniformProba(double upBound, double downBound, Random r) {
+ assert upBound > downBound;
+ this.upBound = upBound;
+ this.downBound = downBound;
+ this.range = upBound - downBound;
+ this.r = r;
+ }
+
+ @Override
+ public double getNextRandom() {
+ return r.nextDouble() * range + downBound;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java b/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java
new file mode 100644
index 0000000..d1faa1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.feature;
+
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.NON_IMPLEMENTED_MSG;
+
+/**
+ * For all indexes, the raw input sequence has to be pre-processed before it's organized by indexes.
+ * In general, index structure needn't maintain all of original data, but only pointers (e.g. the
+ * identifier [start_time, end_time, series_path] can identify a time sequence uniquely).
+ *
+ * <p>By and large, similarity index supposes the input data are ideal: fixed dimension, equal
+ * interval, not missing values and even not outliers. However, in real scenario, the input series
+ * may contain missing values (thus it's not dimension-fixed) and the point's timestamp may contain
+ * slight offset (thus they are not equal-interval). IndexFeatureExtractor need to preprocess the
+ * series and obtain clean series to insert.
+ *
+ * <p>Many indexes will further extract features of alignment sequences, such as PAA, SAX, FFT, etc.
+ *
+ * <p>In summary, the IndexFeatureExtractor can provide three-level information:
+ *
+ * <ul>
+ * <li>L1: a triplet to identify a series: {@code {StartTime, EndTime, Length}} (not submitted in
+ * this pr)
+ * <li>L2: aligned sequence: {@code {a1, a2, ..., an}}
+ * <li>L3: feature: {@code {C1, C2, ..., Cm}}
+ * </ul>
+ */
+public abstract class IndexFeatureExtractor {
+
+ /** In the BUILD and QUERY modes, the IndexFeatureExtractor may work differently. */
+ protected boolean inQueryMode;
+
+ IndexFeatureExtractor(boolean inQueryMode) {
+ this.inQueryMode = inQueryMode;
+ }
+
+ /**
+ * Input a list of new data into the FeatureProcessor.
+ *
+ * @param newData new coming data.
+ */
+ public abstract void appendNewSrcData(TVList newData);
+
+ /**
+ * Input a list of new data into the FeatureProcessor. Due to the exist codes, IoTDB generate
+ * TVList in the insert phase, but obtain BatchData in the query phase.
+ *
+ * @param newData new coming data.
+ */
+ public abstract void appendNewSrcData(BatchData newData);
+
+ /** Having done {@code appendNewSrcData}, the index framework will check {@code hasNext}. */
+ public abstract boolean hasNext();
+
+ /**
+ * If {@code hasNext} returns true, the index framework will call {@code processNext} which
+ * prepares a new series item (ready for insert).
+ */
+ public abstract void processNext();
+
+ /**
+ * After processing a batch of data, the index framework may call {@code clearProcessedSrcData} to
+ * clean out the processed data for releasing memory.
+ */
+ public abstract void clearProcessedSrcData();
+
+ /**
+ * close this extractor and release all allocated resources (TVList and PrimitiveList). It also
+ * returns the data saved for next open.
+ *
+ * @return the data saved for next open.
+ */
+ public abstract ByteBuffer closeAndRelease() throws IOException;
+
+ /** @return current L2: aligned sequence. */
+ public Object getCurrent_L2_AlignedSequence() {
+ throw new UnsupportedOperationException(NON_IMPLEMENTED_MSG);
+ }
+
+ /** @return current L3: costumed feature. */
+ public Object getCurrent_L3_Feature() {
+ throw new UnsupportedOperationException(NON_IMPLEMENTED_MSG);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java b/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java
new file mode 100644
index 0000000..8d95171
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+import java.util.Map;
+
+public class IndexQueryDataSet extends ListDataSet {
+
+ private Map<String, Integer> pathToIndex;
+
+ public IndexQueryDataSet(
+ List<PartialPath> paths, List<TSDataType> dataTypes, Map<String, Integer> pathToIndex) {
+ super(paths, dataTypes);
+ this.pathToIndex = pathToIndex;
+ }
+
+ public Map<String, Integer> getPathToIndex() {
+ return pathToIndex;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java b/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java
new file mode 100644
index 0000000..c7f07a5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.read;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.util.List;
+import java.util.Map;
+
+/** the executor for index query */
+public class QueryIndexExecutor {
+
+ private final Map<String, Object> queryProps;
+ private final IndexType indexType;
+ private final QueryContext context;
+ private final List<PartialPath> paths;
+ private final boolean alignedByTime;
+
+ public QueryIndexExecutor(QueryIndexPlan queryIndexPlan, QueryContext context) {
+ this.indexType = queryIndexPlan.getIndexType();
+ this.queryProps = IndexUtils.toLowerCaseProps(queryIndexPlan.getProps());
+ this.paths = queryIndexPlan.getPaths();
+ this.alignedByTime = queryIndexPlan.isAlignedByTime();
+ this.paths.forEach(PartialPath::toLowerCase);
+ this.context = context;
+ }
+
+ public QueryDataSet executeIndexQuery() throws StorageEngineException, QueryIndexException {
+ // get all related storage group
+ return IndexManager.getInstance()
+ .queryIndex(paths, indexType, queryProps, context, alignedByTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java
new file mode 100644
index 0000000..c9dcaee
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read.optimize;
+
+/**
+ * In the indexing mechanism, it refers to the optimization of the access order of candidate series
+ * after pruning phase. Due to the mismatch between the candidate set order given by the index and
+ * the file organization of TsFile, the access of the refinement phase advised by the index may be
+ * inefficient. We can use this query optimizerto rearrange the candidate set access order.
+ */
+public interface IIndexCandidateOrderOptimize {
+ class Factory {
+
+ private Factory() {
+ // hidden initializer
+ }
+
+ public static IIndexCandidateOrderOptimize getOptimize() {
+ return new NoCandidateOrderOptimizer();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java
new file mode 100644
index 0000000..0354bf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read.optimize;
+
+class NoCandidateOrderOptimizer implements IIndexCandidateOrderOptimize {}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java b/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java
new file mode 100644
index 0000000..bc6cfdd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Singleton pattern.
+ *
+ * <p>Firstly, IIndexRouter is responsible for index metadata management. More importantly, it is
+ * for routing the create/drop/insert/query command to corresponding index processors.
+ *
+ * <p>IIndexRouter can decouple the mapping relationship, which may be re-designed in future,
+ * between {@link org.apache.iotdb.db.index.algorithm.IoTDBIndex IoTDBIndex} and {@link
+ * IndexProcessor} from {@link org.apache.iotdb.db.index.IndexManager IndexManager}.
+ */
+public interface IIndexRouter {
+
+ /**
+ * add an new index into the router.
+ *
+ * @param prefixPath the partial path of given index series
+ * @param indexInfo the index infomation.
+ * @param func a function to create a new IndexProcessor, if it's not created before.
+ * @param doSerialize true to serialize the new information immediately.
+ * @return true if adding index information successfully
+ */
+ boolean addIndexIntoRouter(
+ PartialPath prefixPath,
+ IndexInfo indexInfo,
+ CreateIndexProcessorFunc func,
+ boolean doSerialize)
+ throws MetadataException;
+
+ /**
+ * remove an exist index into the router.
+ *
+ * @param prefixPath the partial path of given index series
+ * @param indexType the type of index to be removed.
+ * @return true if removing index information successfully
+ */
+ boolean removeIndexFromRouter(PartialPath prefixPath, IndexType indexType)
+ throws MetadataException, IOException;
+
+ Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries);
+
+ Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo();
+
+ Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries);
+
+ /**
+ * serialize all index information and processors to the disk
+ *
+ * @param doClose true if close processors after serialization.
+ */
+ void serialize(boolean doClose);
+
+ /** deserialize all index information and processors into the memory */
+ void deserializeAndReload(CreateIndexProcessorFunc func);
+
+ /**
+ * return a subset of the original IIndexRouter for accessing concurrency
+ *
+ * @param storageGroupPath the path of a storageGroup
+ * @return a subset of the original IIndexRouter
+ */
+ IIndexRouter getRouterByStorageGroup(String storageGroupPath);
+
+ int getIndexNum();
+
+ /**
+ * prepare necessary information for index query
+ *
+ * @param partialPath the query path
+ * @param indexType the index type
+ * @param context the query context
+ * @return the necessary information for this query
+ */
+ IndexProcessorStruct startQueryAndCheck(
+ PartialPath partialPath, IndexType indexType, QueryContext context)
+ throws QueryIndexException;
+
+ /**
+ * do something when the query end
+ *
+ * @param indexSeries the query path
+ * @param indexType the index type
+ * @param context the query context
+ */
+ void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context);
+
+ class Factory {
+
+ private Factory() {
+ // hidden initializer
+ }
+
+ public static IIndexRouter getIndexRouter(String routerDir) {
+ return new ProtoIndexRouter(routerDir);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java b/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java
new file mode 100644
index 0000000..0892758
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A proto implementation.
+ *
+ * <p>The subsequence-matching index is created on a single time series, while the whole-matching
+ * index is created on a group of time series with wildcards, so {@code ProtoIndexRouter} manages
+ * the two cases with different Map structures.
+ *
+ * <p>A key function of {@code IIndexRouter } is to quickly route the IndexProcessor for a given
+ * series path. If the path is full-path, it can be found as O(1) in {@code fullPathProcessorMap};
+ * Otherwise, you must traverse every key in {@code wildCardProcessorMap}.
+ */
+public class ProtoIndexRouter implements IIndexRouter {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProtoIndexRouter.class);
+
+ /** for subsequence matching indexes */
+ private Map<String, IndexProcessorStruct> fullPathProcessorMap;
+ /** for whole matching indexes */
+ private Map<PartialPath, IndexProcessorStruct> wildCardProcessorMap;
+
+ private Map<String, Set<String>> sgToFullPathMap;
+ private Map<String, Set<PartialPath>> sgToWildCardPathMap;
+ private MManager mManager;
+ private File routerFile;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private boolean unmodifiable;
+
+ ProtoIndexRouter(String routerFileDir) {
+ this(false);
+ this.routerFile = SystemFileFactory.INSTANCE.getFile(routerFileDir + File.separator + "router");
+ mManager = MManager.getInstance();
+ }
+
+ private ProtoIndexRouter(boolean unmodifiable) {
+ this.unmodifiable = unmodifiable;
+ fullPathProcessorMap = new ConcurrentHashMap<>();
+ sgToFullPathMap = new ConcurrentHashMap<>();
+ sgToWildCardPathMap = new ConcurrentHashMap<>();
+ wildCardProcessorMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void serialize(boolean closeProcessor) {
+ try (OutputStream outputStream = new FileOutputStream(routerFile)) {
+ ReadWriteIOUtils.write(fullPathProcessorMap.size(), outputStream);
+ for (Entry<String, IndexProcessorStruct> entry : fullPathProcessorMap.entrySet()) {
+ String indexSeries = entry.getKey();
+ IndexProcessorStruct v = entry.getValue();
+ ReadWriteIOUtils.write(indexSeries, outputStream);
+ ReadWriteIOUtils.write(v.infos.size(), outputStream);
+ for (Entry<IndexType, IndexInfo> e : v.infos.entrySet()) {
+ IndexInfo indexInfo = e.getValue();
+ indexInfo.serialize(outputStream);
+ }
+ if (closeProcessor && v.processor != null) {
+ v.processor.close(false);
+ }
+ }
+
+ ReadWriteIOUtils.write(wildCardProcessorMap.size(), outputStream);
+ for (Entry<PartialPath, IndexProcessorStruct> entry : wildCardProcessorMap.entrySet()) {
+ PartialPath indexSeries = entry.getKey();
+ IndexProcessorStruct v = entry.getValue();
+ ReadWriteIOUtils.write(indexSeries.getFullPath(), outputStream);
+ ReadWriteIOUtils.write(v.infos.size(), outputStream);
+ for (Entry<IndexType, IndexInfo> e : v.infos.entrySet()) {
+ IndexInfo indexInfo = e.getValue();
+ indexInfo.serialize(outputStream);
+ }
+ if (closeProcessor && v.processor != null) {
+ v.processor.close(false);
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Error when serialize router. Given up.", e);
+ }
+ if (closeProcessor) {
+ fullPathProcessorMap.clear();
+ sgToFullPathMap.clear();
+ sgToWildCardPathMap.clear();
+ wildCardProcessorMap.clear();
+ }
+ }
+
+ @Override
+ public void deserializeAndReload(CreateIndexProcessorFunc func) {
+ if (!routerFile.exists()) {
+ return;
+ }
+ try (InputStream inputStream = new FileInputStream(routerFile)) {
+ int fullSize = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < fullSize; i++) {
+ String indexSeries = ReadWriteIOUtils.readString(inputStream);
+ int indexTypeSize = ReadWriteIOUtils.readInt(inputStream);
+ for (int j = 0; j < indexTypeSize; j++) {
+ IndexInfo indexInfo = IndexInfo.deserialize(inputStream);
+ addIndexIntoRouter(new PartialPath(indexSeries), indexInfo, func, false);
+ }
+ }
+
+ int wildcardSize = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < wildcardSize; i++) {
+ String indexSeries = ReadWriteIOUtils.readString(inputStream);
+ int indexTypeSize = ReadWriteIOUtils.readInt(inputStream);
+ for (int j = 0; j < indexTypeSize; j++) {
+ IndexInfo indexInfo = IndexInfo.deserialize(inputStream);
+ addIndexIntoRouter(new PartialPath(indexSeries), indexInfo, func, false);
+ }
+ }
+ } catch (MetadataException | IOException e) {
+ logger.error("Error when deserialize router. Given up.", e);
+ }
+ }
+
+ @Override
+ public IIndexRouter getRouterByStorageGroup(String storageGroupPath) {
+ lock.readLock().lock();
+ try {
+ ProtoIndexRouter res = new ProtoIndexRouter(true);
+ if (sgToWildCardPathMap.containsKey(storageGroupPath)) {
+ for (PartialPath partialPath : sgToWildCardPathMap.get(storageGroupPath)) {
+ res.wildCardProcessorMap.put(partialPath, this.wildCardProcessorMap.get(partialPath));
+ }
+ }
+ if (sgToFullPathMap.containsKey(storageGroupPath)) {
+ for (String fullPath : sgToFullPathMap.get(storageGroupPath)) {
+ res.fullPathProcessorMap.put(fullPath, this.fullPathProcessorMap.get(fullPath));
+ }
+ }
+ return res;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public int getIndexNum() {
+ return fullPathProcessorMap.size() + wildCardProcessorMap.size();
+ }
+
+ @Override
+ public IndexProcessorStruct startQueryAndCheck(
+ PartialPath partialPath, IndexType indexType, QueryContext context)
+ throws QueryIndexException {
+ lock.readLock().lock();
+ try {
+ IndexProcessorStruct struct;
+ if (partialPath.isFullPath()) {
+ String fullPath = partialPath.getFullPath();
+ if (!fullPathProcessorMap.containsKey(fullPath)) {
+ throw new QueryIndexException("haven't create index on " + fullPath);
+ }
+ struct = fullPathProcessorMap.get(fullPath);
+ } else {
+ if (!wildCardProcessorMap.containsKey(partialPath)) {
+ throw new QueryIndexException("haven't create index on " + partialPath);
+ }
+ struct = wildCardProcessorMap.get(partialPath);
+ }
+ if (struct.processor == null) {
+ throw new QueryIndexException("Index hasn't insert any data");
+ }
+ // we don't add lock to index processor here. It doesn't matter.
+ return struct;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context) {
+ // do nothing.
+ }
+
+ @Override
+ public boolean addIndexIntoRouter(
+ PartialPath partialPath,
+ IndexInfo indexInfo,
+ CreateIndexProcessorFunc func,
+ boolean doSerialize)
+ throws MetadataException {
+ if (unmodifiable) {
+ throw new MetadataException("cannot add index to unmodifiable router");
+ }
+ partialPath.toLowerCase();
+ // only the pair.left (indexType map) will be updated.
+ lock.writeLock().lock();
+ IndexType indexType = indexInfo.getIndexType();
+ // record the relationship between storage group and the
+ StorageGroupMNode storageGroupMNode = mManager.getStorageGroupNodeByPath(partialPath);
+ String storageGroupPath = storageGroupMNode.getPartialPath().getFullPath();
+ // add to pathMap
+ try {
+ if (partialPath.isFullPath()) {
+ String fullPath = partialPath.getFullPath();
+ if (!fullPathProcessorMap.containsKey(fullPath)) {
+ Map<IndexType, IndexInfo> infoMap = new EnumMap<>(IndexType.class);
+ infoMap.put(indexType, indexInfo);
+ IndexProcessor processor = func.act(partialPath, infoMap);
+ fullPathProcessorMap.put(
+ fullPath, new IndexProcessorStruct(processor, partialPath, infoMap));
+ } else {
+ fullPathProcessorMap.get(fullPath).infos.put(indexType, indexInfo);
+ }
+ IndexProcessorStruct pair = fullPathProcessorMap.get(fullPath);
+ pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+
+ // add to sg
+ Set<String> indexSeriesSet = new HashSet<>();
+ Set<String> preSet = sgToFullPathMap.putIfAbsent(storageGroupPath, indexSeriesSet);
+ if (preSet != null) {
+ indexSeriesSet = preSet;
+ }
+ indexSeriesSet.add(fullPath);
+ } else {
+ if (!wildCardProcessorMap.containsKey(partialPath)) {
+ Map<IndexType, IndexInfo> infoMap = new EnumMap<>(IndexType.class);
+ infoMap.put(indexType, indexInfo);
+ IndexProcessor processor = func.act(partialPath, infoMap);
+ PartialPath representativePath = getRepresentativePath(partialPath);
+ wildCardProcessorMap.put(
+ partialPath, new IndexProcessorStruct(processor, representativePath, infoMap));
+ } else {
+ wildCardProcessorMap.get(partialPath).infos.put(indexType, indexInfo);
+ }
+ IndexProcessorStruct pair = wildCardProcessorMap.get(partialPath);
+ pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+
+ // add to sg
+ Set<PartialPath> indexSeriesSet = new HashSet<>();
+ Set<PartialPath> preSet = sgToWildCardPathMap.putIfAbsent(storageGroupPath, indexSeriesSet);
+ if (preSet != null) {
+ indexSeriesSet = preSet;
+ }
+ indexSeriesSet.add(partialPath);
+ if (doSerialize) {
+ serialize(false);
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return true;
+ }
+
+ private PartialPath getRepresentativePath(PartialPath wildcardPath) throws MetadataException {
+ Pair<List<PartialPath>, Integer> paths =
+ mManager.getAllTimeseriesPathWithAlias(wildcardPath, 1, 0);
+ if (paths.left.isEmpty()) {
+ throw new MetadataException("Please create at least one series before create index");
+ } else {
+ return paths.left.get(0);
+ }
+ }
+
+ @Override
+ public boolean removeIndexFromRouter(PartialPath partialPath, IndexType indexType)
+ throws MetadataException, IOException {
+ if (unmodifiable) {
+ throw new MetadataException("cannot remove index from unmodifiable router");
+ }
+ partialPath.toLowerCase();
+ // only the pair.left (indexType map) will be updated.
+ lock.writeLock().lock();
+ // record the relationship between storage group and the index processors
+ StorageGroupMNode storageGroupMNode = mManager.getStorageGroupNodeByPath(partialPath);
+ String storageGroupPath = storageGroupMNode.getPartialPath().getFullPath();
+
+ // remove from pathMap
+ try {
+ if (partialPath.isFullPath()) {
+ String fullPath = partialPath.getFullPath();
+ if (fullPathProcessorMap.containsKey(fullPath)) {
+ IndexProcessorStruct pair = fullPathProcessorMap.get(fullPath);
+ pair.infos.remove(indexType);
+ pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+ if (pair.infos.isEmpty()) {
+ sgToFullPathMap.get(storageGroupPath).remove(fullPath);
+ fullPathProcessorMap.remove(fullPath);
+ pair.representativePath = null;
+ pair.processor.close(true);
+ pair.processor = null;
+ }
+ }
+ } else {
+ if (wildCardProcessorMap.containsKey(partialPath)) {
+ IndexProcessorStruct pair = wildCardProcessorMap.get(partialPath);
+ pair.infos.remove(indexType);
+ pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+ if (pair.infos.isEmpty()) {
+ sgToWildCardPathMap.get(storageGroupPath).remove(partialPath);
+ wildCardProcessorMap.remove(partialPath);
+ pair.representativePath = null;
+ pair.processor.close(true);
+ pair.processor = null;
+ }
+ }
+ }
+ serialize(false);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return true;
+ }
+
+ @Override
+ public Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries) {
+ lock.readLock().lock();
+ try {
+ if (wildCardProcessorMap.containsKey(indexSeries)) {
+ return Collections.unmodifiableMap(wildCardProcessorMap.get(indexSeries).infos);
+ }
+ if (fullPathProcessorMap.containsKey(indexSeries.getFullPath())) {
+ return Collections.unmodifiableMap(
+ fullPathProcessorMap.get(indexSeries.getFullPath()).infos);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ return new HashMap<>();
+ }
+
+ @Override
+ public Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo() {
+ lock.readLock().lock();
+ List<IndexProcessorStruct> res =
+ new ArrayList<>(wildCardProcessorMap.size() + fullPathProcessorMap.size());
+ try {
+ wildCardProcessorMap.forEach((k, v) -> res.add(v));
+ fullPathProcessorMap.forEach((k, v) -> res.add(v));
+ } finally {
+ lock.readLock().unlock();
+ }
+ return res;
+ }
+
+ @Override
+ public Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries) {
+ lock.readLock().lock();
+ List<IndexProcessor> res = new ArrayList<>();
+ try {
+ if (fullPathProcessorMap.containsKey(timeSeries.getFullPath())) {
+ res.add(fullPathProcessorMap.get(timeSeries.getFullPath()).processor);
+ } else {
+ wildCardProcessorMap.forEach(
+ (k, v) -> {
+ if (k.matchFullPath(timeSeries)) {
+ res.add(v.processor);
+ }
+ });
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ return res;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ Iterable<IndexProcessorStruct> all = getAllIndexProcessorsAndInfo();
+ for (IndexProcessorStruct struct : all) {
+ sb.append(struct.toString()).append("\n");
+ }
+ return sb.toString();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java b/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java
new file mode 100644
index 0000000..fe42852
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The data which has flushed out may be updated due to the "unsequence data" or deletion. As we
+ * cannot assume that all index techniques are able to support data deletion or update, the index
+ * framework introduces the concept of "index usability range". In the time range which is marked as
+ * "index unusable", the correctness of index's pruning phase is not guaranteed.
+ *
+ * <p>A natural solution is to put the data in the "index unusable" range into the post-processing
+ * phase (or called refinement phase) directly.
+ *
+ * <p>TODO The IIndexUsable's update due to the "merge" finishing hasn't been taken in account.
+ */
+public interface IIndexUsable {
+
+ /**
+ * add a range where index is usable.
+ *
+ * @param fullPath the path of time series
+ * @param start start timestamp
+ * @param end end timestamp
+ */
+ void addUsableRange(PartialPath fullPath, long start, long end);
+
+ /**
+ * minus a range where index is usable.
+ *
+ * @param fullPath the path of time series
+ * @param start start timestamp
+ * @param end end timestamp
+ */
+ void minusUsableRange(PartialPath fullPath, long start, long end);
+
+ /**
+ * The result format depends on "sub-matching" ({@linkplain SubMatchIndexUsability}) or
+ * "whole-matching" ({@linkplain WholeMatchIndexUsability})
+ *
+ * @return the range where index is unusable.
+ */
+ Object getUnusableRange();
+
+ void serialize(OutputStream outputStream) throws IOException;
+
+ void deserialize(InputStream inputStream) throws IllegalPathException, IOException;
+
+ class Factory {
+
+ private Factory() {
+ // hidden initializer
+ }
+
+ public static IIndexUsable createEmptyIndexUsability(PartialPath path) {
+ if (path.isFullPath()) {
+ return new SubMatchIndexUsability();
+ } else {
+ return new WholeMatchIndexUsability();
+ }
+ }
+
+ public static IIndexUsable deserializeIndexUsability(PartialPath path, InputStream inputStream)
+ throws IOException, IllegalPathException {
+ IIndexUsable res;
+ if (path.isFullPath()) {
+ res = new SubMatchIndexUsability();
+ res.deserialize(inputStream);
+ } else {
+ res = new WholeMatchIndexUsability();
+ res.deserialize(inputStream);
+ }
+ return res;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java b/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java
new file mode 100644
index 0000000..6ae41d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is to record the index usable range for a single long series, which corresponds to the
+ * subsequence matching scenario.
+ *
+ * <p>The series path in the initialized parameters must be a full path (without wildcard
+ * characters).
+ *
+ * <p>It's not thread-safe.
+ */
+public class SubMatchIndexUsability implements IIndexUsable {
+
+ private int maxSizeOfUsableSegments;
+ private final boolean initAllUsable;
+ private RangeNode unusableRanges;
+ private int size;
+
+ SubMatchIndexUsability() {
+ this(IoTDBDescriptor.getInstance().getConfig().getDefaultMaxSizeOfUnusableSegments(), true);
+ }
+
+ /**
+ * Construction
+ *
+ * @param maxSizeOfUsableSegments the max size of usable segments
+ * @param initAllUsable true if the entire time range is set to "usable".
+ */
+ SubMatchIndexUsability(int maxSizeOfUsableSegments, boolean initAllUsable) {
+ this.maxSizeOfUsableSegments = maxSizeOfUsableSegments;
+ this.initAllUsable = initAllUsable;
+ unusableRanges = new RangeNode(Long.MIN_VALUE, Long.MAX_VALUE, null);
+ size = 1;
+ if (initAllUsable) {
+ addUsableRange(null, 0, Long.MAX_VALUE - 1);
+ }
+ }
+
+ @Override
+ public void addUsableRange(PartialPath fullPath, long startTime, long endTime) {
+ // simplify the problem
+ if (startTime == Long.MIN_VALUE) {
+ startTime = startTime + 1;
+ }
+ if (endTime == Long.MAX_VALUE) {
+ endTime = endTime - 1;
+ }
+ RangeNode node = locateIdxByTime(startTime);
+ RangeNode prevNode = node;
+ while (node != null && node.start <= endTime) {
+ assert node.start <= node.end;
+ assert startTime <= endTime;
+ if (node.end < startTime) {
+ prevNode = node;
+ node = node.next;
+ } else if (startTime <= node.start && node.end <= endTime) {
+ // the unusable range is covered.
+ prevNode.next = node.next;
+ node = node.next;
+ size--;
+ } else if (node.start <= startTime && endTime <= node.end) {
+ if (node.start == startTime) {
+ // left aligned
+ node.start = endTime + 1;
+ prevNode = node;
+ node = node.next;
+ } else if (node.end == endTime) {
+ // right aligned
+ node.end = startTime - 1;
+ prevNode = node;
+ node = node.next;
+ }
+ // the unusable range is split. If it reaches the upper bound, not split it
+ else if (size < maxSizeOfUsableSegments) {
+ RangeNode newNode = new RangeNode(endTime + 1, node.end, node.next);
+ node.end = startTime - 1;
+ newNode.next = node.next;
+ node.next = newNode;
+ prevNode = newNode;
+ node = newNode.next;
+ size++;
+ } else {
+ // don't split, thus do nothing.
+ prevNode = node;
+ node = node.next;
+ }
+ } else if (startTime < node.start) {
+ node.start = endTime + 1;
+ prevNode = node;
+ node = node.next;
+ } else {
+ node.end = startTime - 1;
+ prevNode = node;
+ node = node.next;
+ }
+ }
+ }
+
+ @Override
+ public void minusUsableRange(PartialPath fullPath, long startTime, long endTime) {
+ // simplify the problem
+ if (startTime == Long.MIN_VALUE) {
+ startTime = startTime + 1;
+ }
+ if (endTime == Long.MAX_VALUE) {
+ endTime = endTime - 1;
+ }
+ RangeNode node = locateIdxByTime(startTime);
+ if (endTime <= node.end) {
+ return;
+ }
+ // add the unusable range into the list
+ RangeNode newNode;
+ if (startTime <= node.end + 1) {
+ // overlapping this node
+ node.end = endTime;
+ newNode = node;
+ node = newNode.next;
+ } else if (node.next.start <= endTime + 1) {
+ newNode = node.next;
+ if (startTime < newNode.start) {
+ newNode.start = startTime;
+ }
+ if (newNode.end < endTime) {
+ newNode.end = endTime;
+ }
+ node = newNode.next;
+ } else {
+ // non-overlap with former and latter nodes
+ if (size < maxSizeOfUsableSegments) {
+ // it doesn't overlap with latter node, new node is inserted
+ newNode = new RangeNode(startTime, endTime, node.next);
+ node.next = newNode;
+ size++;
+ } else {
+ // we cannot add more range, merge it with closer neighbor.
+ if (startTime - node.end < node.next.start - endTime) {
+ node.end = endTime;
+ } else {
+ node.next.start = startTime;
+ }
+ }
+ return;
+ }
+
+ // merge the overlapped list
+ while (node != null && node.start <= endTime + 1) {
+ if (newNode.end < node.end) {
+ newNode.end = node.end;
+ }
+ // delete the redundant node
+ newNode.next = node.next;
+ node = node.next;
+ size--;
+ }
+ }
+
+ @Override
+ public List<Filter> getUnusableRange() {
+ List<Filter> res = new ArrayList<>();
+ RangeNode p = this.unusableRanges;
+ while (p != null) {
+ res.add(toFilter(p.start, p.end));
+ p = p.next;
+ }
+ return res;
+ }
+
+ /**
+ * Find the latest node whose start time is less than {@code timestamp} A naive scanning search
+ * for the linked list. Further optimization is still going on. It returns a node and there is
+ * {@code node.start <= timestamp}
+ */
+ private RangeNode locateIdxByTime(long timestamp) {
+ if (unusableRanges.next == null) {
+ return unusableRanges;
+ }
+ RangeNode res = null;
+ RangeNode node = unusableRanges;
+ while (node.next != null) {
+ if (node.start >= timestamp) {
+ break;
+ }
+ res = node;
+ node = node.next;
+ }
+ return res;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(size, outputStream);
+ ReadWriteIOUtils.write(maxSizeOfUsableSegments, outputStream);
+ RangeNode.serialize(unusableRanges, outputStream);
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IOException {
+ size = ReadWriteIOUtils.readInt(inputStream);
+ maxSizeOfUsableSegments = ReadWriteIOUtils.readInt(inputStream);
+ unusableRanges = RangeNode.deserialize(inputStream);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(String.format("size:%d,", size));
+ RangeNode node = unusableRanges;
+ while (node != null) {
+ sb.append(node.toString());
+ node = node.next;
+ }
+ return sb.toString();
+ }
+
+ private static Filter toFilter(long startTime, long endTime) {
+ return FilterFactory.and(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime));
+ }
+
+ public boolean hasUnusableRange() {
+ if (initAllUsable) {
+ return !(unusableRanges.end == 0
+ && unusableRanges.next != null
+ && unusableRanges.next.start == Long.MAX_VALUE - 1
+ && unusableRanges.next.next == null);
+ } else {
+ return size > 1;
+ }
+ }
+
+ private static class RangeNode {
+
+ long start;
+ long end;
+ RangeNode next;
+
+ RangeNode(long start, long end, RangeNode next) {
+ this.start = start;
+ this.end = end;
+ this.next = next;
+ }
+
+ @Override
+ public String toString() {
+ return "["
+ + (start == Long.MIN_VALUE ? "MIN" : start)
+ + ","
+ + (end == Long.MAX_VALUE ? "MAX" : end)
+ + "],";
+ }
+
+ public static void serialize(RangeNode head, OutputStream outputStream) throws IOException {
+ int listLength = 0;
+ RangeNode tmp = head;
+ while (tmp != null) {
+ listLength++;
+ tmp = tmp.next;
+ }
+ ReadWriteIOUtils.write(listLength, outputStream);
+ while (head != null) {
+ ReadWriteIOUtils.write(head.start, outputStream);
+ ReadWriteIOUtils.write(head.end, outputStream);
+ head = head.next;
+ }
+ }
+
+ public static RangeNode deserialize(InputStream inputStream) throws IOException {
+ int listLength = ReadWriteIOUtils.readInt(inputStream);
+ RangeNode fakeHead = new RangeNode(-1, -1, null);
+ RangeNode node = fakeHead;
+ for (int i = 0; i < listLength; i++) {
+ long start = ReadWriteIOUtils.readLong(inputStream);
+ long end = ReadWriteIOUtils.readLong(inputStream);
+ RangeNode nextNode = new RangeNode(start, end, null);
+ node.next = nextNode;
+ node = nextNode;
+ }
+ return fakeHead.next;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java b/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java
new file mode 100644
index 0000000..d35bb81
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class is to record the index usable range for a list of short series, which corresponds to
+ * the whole matching scenario.
+ *
+ * <p>The series path involves wildcard characters. One series is marked as "index-usable" or
+ * "index-unusable".
+ *
+ * <p>It's not thread-safe.
+ */
+public class WholeMatchIndexUsability implements IIndexUsable {
+
+ private final Set<PartialPath> unusableSeriesSet;
+
+ WholeMatchIndexUsability() {
+ this.unusableSeriesSet = new HashSet<>();
+ }
+
+ @Override
+ public void addUsableRange(PartialPath fullPath, long start, long end) {
+ // do nothing temporarily
+ }
+
+ @Override
+ public void minusUsableRange(PartialPath fullPath, long start, long end) {
+ unusableSeriesSet.add(fullPath);
+ }
+
+ @Override
+ public Set<PartialPath> getUnusableRange() {
+ return Collections.unmodifiableSet(this.unusableSeriesSet);
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(unusableSeriesSet.size(), outputStream);
+ for (PartialPath s : unusableSeriesSet) {
+ ReadWriteIOUtils.write(s.getFullPath(), outputStream);
+ }
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IllegalPathException, IOException {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ unusableSeriesSet.add(new PartialPath(ReadWriteIOUtils.readString(inputStream)));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
index 1f006d9..fb19c9a 100755
--- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
@@ -156,6 +156,15 @@ public class PartialPath extends Path implements Comparable<Path> {
return true;
}
+ public boolean isFullPath() {
+ for (String node : nodes) {
+ if (node.equals(IoTDBConstant.PATH_WILDCARD)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public String getFullPath() {
if (fullPath != null) {
@@ -312,4 +321,13 @@ public class PartialPath extends Path implements Comparable<Path> {
}
return ret;
}
+
+ public void toLowerCase() {
+ for (int i = 0; i < nodes.length; i++) {
+ nodes[i] = nodes[i].toLowerCase();
+ }
+ fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
+ if (measurementAlias != null) measurementAlias = measurementAlias.toLowerCase();
+ if (tsAlias != null) tsAlias = tsAlias.toLowerCase();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 54a0bf7..db9d416 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -39,11 +39,15 @@ import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.QueryIdNotExsitException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -76,6 +80,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CountPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
@@ -83,6 +88,7 @@ import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
@@ -193,6 +199,7 @@ public class PlanExecutor implements IPlanExecutor {
protected IQueryRouter queryRouter;
// for administration
private IAuthorizer authorizer;
+ private IndexManager indexManager;
private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
@@ -200,6 +207,7 @@ public class PlanExecutor implements IPlanExecutor {
queryRouter = new QueryRouter();
try {
authorizer = BasicAuthorizer.getInstance();
+ indexManager = IndexManager.getInstance();
} catch (AuthException e) {
throw new QueryProcessException(e.getMessage());
}
@@ -325,9 +333,9 @@ public class PlanExecutor implements IPlanExecutor {
case STOP_TRIGGER:
return operateStopTrigger((StopTriggerPlan) plan);
case CREATE_INDEX:
- throw new QueryProcessException("Create index hasn't been supported yet");
+ return createIndex((CreateIndexPlan) plan);
case DROP_INDEX:
- throw new QueryProcessException("Drop index hasn't been supported yet");
+ return dropIndex((DropIndexPlan) plan);
case KILL:
try {
operateKillQuery((KillQueryPlan) plan);
@@ -479,7 +487,7 @@ public class PlanExecutor implements IPlanExecutor {
GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
} else if (queryPlan instanceof QueryIndexPlan) {
- throw new QueryProcessException("Query index hasn't been supported yet");
+ queryDataSet = queryRouter.indexQuery((QueryIndexPlan) queryPlan, context);
} else if (queryPlan instanceof AggregationPlan) {
AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
queryDataSet = queryRouter.aggregate(aggregationPlan, context);
@@ -1750,4 +1758,33 @@ public class PlanExecutor implements IPlanExecutor {
}
return noExistSg;
}
+
+ private boolean createIndex(CreateIndexPlan createIndexPlan) throws QueryProcessException {
+ List<PartialPath> paths = createIndexPlan.getPaths();
+ List<PartialPath> partialPaths = new ArrayList<>(paths);
+ long startTime = createIndexPlan.getTime();
+ IndexType indexType = createIndexPlan.getIndexType();
+ Map<String, String> props = createIndexPlan.getProps();
+ IndexInfo indexInfo = new IndexInfo(indexType, startTime, props);
+ try {
+ IndexManager.getInstance().createIndex(partialPaths, indexInfo);
+ } catch (MetadataException e) {
+ throw new IndexManagerException(e);
+ }
+ return true;
+ }
+
+ private boolean dropIndex(DropIndexPlan dropIndexPlan) throws QueryProcessException {
+ List<PartialPath> paths = dropIndexPlan.getPaths();
+ List<PartialPath> partialPaths = new ArrayList<>(paths);
+ IndexType indexType = dropIndexPlan.getIndexType();
+ try {
+ IndexManager.getInstance().dropIndex(partialPaths, indexType);
+ } catch (MetadataException e) {
+ throw new IndexManagerException(e);
+ } catch (IOException e2) {
+ throw new IndexManagerException(e2.getMessage());
+ }
+ return true;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 27d20f1..9410c99 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -28,6 +28,7 @@ public class QueryIndexPlan extends RawDataQueryPlan {
private Map<String, Object> props;
private IndexType indexType;
+ private boolean alignedByTime = false;
public QueryIndexPlan() {
super();
@@ -50,6 +51,14 @@ public class QueryIndexPlan extends RawDataQueryPlan {
this.props = props;
}
+ public boolean isAlignedByTime() {
+ return alignedByTime;
+ }
+
+ public void setAlignedByTime(boolean alignedByTime) {
+ this.alignedByTime = alignedByTime;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
index aef7399..989aa10 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
@@ -19,12 +19,14 @@
package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -65,4 +67,14 @@ public interface IQueryRouter {
/** Execute UDTF query */
QueryDataSet udtfQuery(UDTFPlan udtfPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException, InterruptedException;
+
+ /**
+ * Query executor
+ *
+ * @param queryPlan
+ * @param context
+ * @return
+ */
+ QueryDataSet indexQuery(QueryIndexPlan queryPlan, QueryContext context)
+ throws QueryIndexException, StorageEngineException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 3478c6c..fd67530 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -20,13 +20,16 @@
package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.read.QueryIndexExecutor;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -128,7 +131,7 @@ public class QueryRouter implements IQueryRouter {
AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan);
- QueryDataSet dataSet = null;
+ QueryDataSet dataSet;
if (optimizedExpression != null
&& optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
@@ -252,6 +255,13 @@ public class QueryRouter implements IQueryRouter {
return lastQueryExecutor.execute(context, lastQueryPlan);
}
+ @Override
+ public QueryDataSet indexQuery(QueryIndexPlan queryPlan, QueryContext context)
+ throws QueryIndexException, StorageEngineException {
+ QueryIndexExecutor executor = new QueryIndexExecutor(queryPlan, context);
+ return executor.executeIndexQuery();
+ }
+
protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
return new LastQueryExecutor(lastQueryPlan);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index c2368c4..7332945 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.index.IndexManager;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.query.control.TracingManager;
@@ -116,6 +117,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(UDFClassLoaderManager.getInstance());
registerManager.register(UDFRegistrationService.getInstance());
+ registerManager.register(IndexManager.getInstance());
registerManager.register(RPCService.getInstance());
if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index b474b2c..0704e48 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -50,7 +50,10 @@ public enum ServiceType {
FLUSH_SERVICE(
"Flush ServerService", generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")),
- CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor");
+ CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor"),
+
+ INDEX_SERVICE(
+ "Index ServerService", generateJmxName("org.apache.iotdb.db.index", "Index Manager"));
private final String name;
private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 952a86c..28142d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metrics.server.SqlArgument;
import org.apache.iotdb.db.qp.Planner;
@@ -58,6 +59,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
@@ -667,6 +669,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setIgnoreTimeStamp(false);
}
+ if (plan instanceof QueryIndexPlan) {
+ resp.setColumns(
+ newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
+ resp.setDataTypeList(
+ newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+ resp.setColumnNameIndexMap(((IndexQueryDataSet) newDataSet).getPathToIndex());
+ }
if (newDataSet instanceof DirectNonAlignDataSet) {
resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
} else {
@@ -847,6 +856,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
.getOutputDataType());
}
break;
+ case QUERY_INDEX:
+ // For query index, we don't know the columns before actual query.
+ // It will be deferred after obtaining query result set.
+ break;
default:
throw new TException("unsupported query type: " + plan.getOperatorType());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..82a521c 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -571,4 +572,32 @@ public abstract class TVList {
public long getLastTime() {
return getTime(size - 1);
}
+
+ @TestOnly
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ for (int i = 0; i < size; i++) {
+ TimeValuePair pair = getTimeValuePair(i);
+ switch (getDataType()) {
+ case INT32:
+ sb.append(String.format("[%d,%d],", pair.getTimestamp(), pair.getValue().getInt()));
+ break;
+ case INT64:
+ sb.append(String.format("[%d,%d],", pair.getTimestamp(), pair.getValue().getLong()));
+ break;
+ case FLOAT:
+ sb.append(String.format("[%d,%.2f],", pair.getTimestamp(), pair.getValue().getFloat()));
+ break;
+ case DOUBLE:
+ sb.append(String.format("[%d,%.2f],", pair.getTimestamp(), pair.getValue().getDouble()));
+ break;
+ default:
+ throw new NotImplementedException(getDataType().toString());
+ }
+ }
+ sb.append("}");
+ return sb.toString();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index cea833d..6f545b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -221,7 +221,8 @@ public class TsFileRecoverPerformer {
new MemTableFlushTask(
recoverMemTable,
restorableTsFileIOWriter,
- tsFileResource.getTsFile().getParentFile().getParentFile().getName());
+ tsFileResource.getTsFile().getParentFile().getParentFile().getName(),
+ sequence);
tableFlushTask.syncFlushMemTable();
tsFileResource.updatePlanIndexes(recoverMemTable.getMinPlanIndex());
tsFileResource.updatePlanIndexes(recoverMemTable.getMaxPlanIndex());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index a1b5d55..0576236 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -69,7 +69,8 @@ public class MemTableFlushTaskTest {
MemTableTestUtils.deviceId0,
MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0);
- MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
+ MemTableFlushTask memTableFlushTask =
+ new MemTableFlushTask(memTable, writer, storageGroup, true);
assertTrue(
writer
.getVisibleMetadataList(
diff --git a/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java b/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java
new file mode 100644
index 0000000..a9fe55f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.it;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.math.Randomwalk;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.index.common.IndexType.NO_INDEX;
+import static org.junit.Assert.fail;
+
+public class NoIndexWriteIT {
+
+ private static final String insertPattern = "INSERT INTO %s(timestamp, %s) VALUES (%d, %.3f)";
+
+ private static final String storageGroupSub = "root.wind1";
+ private static final String storageGroupWhole = "root.wind2";
+
+ private static final String speed1 = "root.wind1.azq01.speed";
+ private static final String speed1Device = "root.wind1.azq01";
+ private static final String speed1Sensor = "speed";
+
+ private static final String directionDevicePattern = "root.wind2.%d";
+ private static final String directionPattern = "root.wind2.%d.direction";
+ private static final String directionSensor = "direction";
+
+ private static final String indexSub = speed1;
+ private static final String indexWhole = "root.wind2.*.direction";
+ private static final int wholeSize = 5;
+ private static final int wholeDim = 100;
+ private static final int subLength = 10_000;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableIndex(true);
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setEnableIndex(false);
+ }
+
+ @Test
+ public void checkWrite() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ // IoTDBDescriptor.getInstance().getConfig().setEnableIndex(false);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement(); ) {
+ long start = System.currentTimeMillis();
+ statement.execute(String.format("SET STORAGE GROUP TO %s", storageGroupSub));
+ statement.execute(String.format("SET STORAGE GROUP TO %s", storageGroupWhole));
+
+ System.out.println(
+ String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
+ statement.execute(
+ String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
+
+ for (int i = 0; i < wholeSize; i++) {
+ String wholePath = String.format(directionPattern, i);
+ statement.execute(
+ String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", wholePath));
+ }
+ long startCreateIndex = System.currentTimeMillis();
+ statement.execute(String.format("CREATE INDEX ON %s WITH INDEX=%s", indexSub, NO_INDEX));
+ statement.execute(String.format("CREATE INDEX ON %s WITH INDEX=%s", indexWhole, NO_INDEX));
+
+ System.out.println(IndexManager.getInstance().getRouter());
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+ + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+ + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ IndexManager.getInstance().getRouter().toString());
+ TVList subInput = Randomwalk.generateRanWalkTVList(subLength);
+ long startInsertSub = System.currentTimeMillis();
+ for (int i = 0; i < subInput.size(); i++) {
+ statement.execute(
+ String.format(
+ insertPattern,
+ speed1Device,
+ speed1Sensor,
+ subInput.getTime(i),
+ subInput.getDouble(i)));
+ }
+ statement.execute("flush");
+ System.out.println("insert finish for subsequence case");
+ System.out.println(IndexManager.getInstance().getRouter());
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+ + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+ + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ IndexManager.getInstance().getRouter().toString());
+
+ long startInsertWhole = System.currentTimeMillis();
+ TVList wholeInput = Randomwalk.generateRanWalkTVList(wholeDim * wholeSize);
+ for (int i = 0; i < wholeSize; i++) {
+ String device = String.format(directionDevicePattern, i);
+ statement.execute(
+ String.format(
+ insertPattern,
+ device,
+ directionSensor,
+ wholeInput.getTime(i),
+ wholeInput.getDouble(i)));
+ }
+ statement.execute("flush");
+ long end = System.currentTimeMillis();
+ System.out.println(IndexManager.getInstance().getRouter());
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+ + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+ + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ IndexManager.getInstance().getRouter().toString());
+ System.out.println("insert finish for subsequence case");
+ System.out.println(String.format("create series use %d ms", (startCreateIndex - start)));
+ System.out.println(
+ String.format("create index use %d ms", (startInsertSub - startCreateIndex)));
+ System.out.println(
+ String.format("insert sub use %d ms", (startInsertWhole - startInsertSub)));
+ System.out.println(String.format("insert whole use %d ms", (end - startInsertWhole)));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java b/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java
new file mode 100644
index 0000000..69af9e9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_SLIDE_STEP;
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_WINDOW_RANGE;
+import static org.apache.iotdb.db.index.common.IndexConstant.PAA_DIM;
+import static org.apache.iotdb.db.index.common.IndexType.NO_INDEX;
+
+public class ProtoIndexRouterTest {
+
+ private static final String storageGroupSub = "root.wind1";
+ private static final String storageGroupFull = "root.wind2";
+ private static final String speed1 = "root.wind1.azq01.speed";
+ private static final String direction1 = "root.wind2.1.direction";
+ private static final String direction2 = "root.wind2.2.direction";
+ private static final String direction3 = "root.wind2.3.direction";
+ private static final String index_sub = speed1;
+ private static final String index_full = "root.wind2.*.direction";
+ private IndexInfo infoFull;
+ private IndexInfo infoSub;
+ private CreateIndexProcessorFunc fakeCreateFunc;
+
+ private void prepareMManager() throws MetadataException {
+ MManager mManager = MManager.getInstance();
+ mManager.init();
+ mManager.setStorageGroup(new PartialPath(storageGroupSub));
+ mManager.setStorageGroup(new PartialPath(storageGroupFull));
+ mManager.createTimeseries(
+ new PartialPath(speed1),
+ TSDataType.FLOAT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ mManager.createTimeseries(
+ new PartialPath(direction1),
+ TSDataType.FLOAT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ mManager.createTimeseries(
+ new PartialPath(direction2),
+ TSDataType.FLOAT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ mManager.createTimeseries(
+ new PartialPath(direction3),
+ TSDataType.FLOAT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ Map<String, String> props_sub = new HashMap<>();
+ props_sub.put(INDEX_WINDOW_RANGE, "4");
+ props_sub.put(INDEX_SLIDE_STEP, "8");
+
+ Map<String, String> props_full = new HashMap<>();
+ props_full.put(PAA_DIM, "5");
+ this.infoSub = new IndexInfo(NO_INDEX, 0, props_sub);
+ this.infoFull = new IndexInfo(NO_INDEX, 5, props_full);
+ this.fakeCreateFunc =
+ (indexSeries, indexInfoMap) ->
+ new IndexProcessor(
+ indexSeries, testRouterDir + File.separator + "index_fake_" + indexSeries);
+ }
+
+ private static final String testRouterDir = "test_protoIndexRouter";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ if (!new File(testRouterDir).exists()) {
+ new File(testRouterDir).mkdirs();
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ if (new File(testRouterDir).exists()) {
+ FileUtils.deleteDirectory(new File(testRouterDir));
+ }
+ }
+
+ @Test
+ public void testBasic() throws MetadataException, IOException {
+ prepareMManager();
+ ProtoIndexRouter router = new ProtoIndexRouter(testRouterDir);
+
+ router.addIndexIntoRouter(new PartialPath(index_sub), infoSub, fakeCreateFunc, true);
+ router.addIndexIntoRouter(new PartialPath(index_full), infoFull, fakeCreateFunc, true);
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+ + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+ + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ router.toString());
+ // select by storage group
+ ProtoIndexRouter sgRouters =
+ (ProtoIndexRouter) router.getRouterByStorageGroup(storageGroupFull);
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+ + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n",
+ sgRouters.toString());
+
+ // serialize
+ router.serialize(false);
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+ + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+ + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ router.toString());
+ router.serialize(true);
+ Assert.assertEquals("", router.toString());
+
+ IIndexRouter newRouter = new ProtoIndexRouter(testRouterDir);
+ newRouter.deserializeAndReload(fakeCreateFunc);
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+ + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+ + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ newRouter.toString());
+
+ // delete index
+ newRouter.removeIndexFromRouter(new PartialPath(index_full), NO_INDEX);
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ newRouter.toString());
+ newRouter.removeIndexFromRouter(new PartialPath(index_full), NO_INDEX);
+ Assert.assertEquals(
+ "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+ + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+ newRouter.toString());
+
+ newRouter.removeIndexFromRouter(new PartialPath(index_sub), NO_INDEX);
+ Assert.assertEquals("", newRouter.toString());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java b/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java
new file mode 100644
index 0000000..b177a14
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class SubMatchIndexUsabilityTest {
+
+ @Test
+ public void addUsableRange1() throws IOException {
+ SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+ Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+ // split range
+ usability.addUsableRange(null, 1, 9);
+ Assert.assertEquals("size:2,[MIN,0],[10,MAX],", usability.toString());
+ System.out.println(usability);
+ usability.addUsableRange(null, 21, 29);
+ Assert.assertEquals("size:3,[MIN,0],[10,20],[30,MAX],", usability.toString());
+ System.out.println(usability);
+ usability.addUsableRange(null, 41, 49);
+ Assert.assertEquals("size:4,[MIN,0],[10,20],[30,40],[50,MAX],", usability.toString());
+ System.out.println(usability);
+ // cover a range
+ usability.addUsableRange(null, 29, 45);
+ Assert.assertEquals("size:3,[MIN,0],[10,20],[50,MAX],", usability.toString());
+ System.out.println(usability);
+
+ // split a range
+ usability.addUsableRange(null, 14, 17);
+ Assert.assertEquals("size:4,[MIN,0],[10,13],[18,20],[50,MAX],", usability.toString());
+ System.out.println(usability);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ usability.serialize(out);
+ InputStream in = new ByteArrayInputStream(out.toByteArray());
+ SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+ usability2.deserialize(in);
+ Assert.assertEquals("size:4,[MIN,0],[10,13],[18,20],[50,MAX],", usability2.toString());
+ }
+
+ @Test
+ public void addUsableRange2() throws IOException {
+ SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+ usability.addUsableRange(null, 1, 19);
+ usability.addUsableRange(null, 51, 59);
+ usability.addUsableRange(null, 81, 99);
+ System.out.println(usability);
+ Assert.assertEquals("size:4,[MIN,0],[20,50],[60,80],[100,MAX],", usability.toString());
+ // left cover
+ usability.addUsableRange(null, 10, 29);
+ System.out.println(usability);
+ Assert.assertEquals("size:4,[MIN,0],[30,50],[60,80],[100,MAX],", usability.toString());
+ // right cover
+ usability.addUsableRange(null, 71, 99);
+ System.out.println(usability);
+ Assert.assertEquals("size:4,[MIN,0],[30,50],[60,70],[100,MAX],", usability.toString());
+ // left cover multiple
+ usability.addUsableRange(null, 20, 80);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,0],[100,MAX],", usability.toString());
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ usability.serialize(out);
+ InputStream in = new ByteArrayInputStream(out.toByteArray());
+ SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+ usability2.deserialize(in);
+ Assert.assertEquals("size:2,[MIN,0],[100,MAX],", usability2.toString());
+ }
+
+ @Test
+ public void minusUsableRange() throws IOException {
+ SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+
+ // merge with MIN, MAX
+ usability.minusUsableRange(null, 1, 19);
+ System.out.println(usability);
+ Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+
+ // new range is covered by the first range
+ usability.addUsableRange(null, 51, 89);
+ usability.minusUsableRange(null, 1, 19);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,50],[90,MAX],", usability.toString());
+
+ usability.addUsableRange(null, 101, 259);
+ // new range extend node's end time
+ usability.minusUsableRange(null, 51, 60);
+ System.out.println(usability);
+ Assert.assertEquals("size:3,[MIN,60],[90,100],[260,MAX],", usability.toString());
+
+ // new range extend node's start time
+ usability.minusUsableRange(null, 80, 89);
+ System.out.println(usability);
+ Assert.assertEquals("size:3,[MIN,60],[80,100],[260,MAX],", usability.toString());
+
+ // new range is inserted as a individual node [120, 140]
+ usability.minusUsableRange(null, 120, 140);
+ System.out.println(usability);
+ Assert.assertEquals("size:4,[MIN,60],[80,100],[120,140],[260,MAX],", usability.toString());
+
+ // re-insert: new range is totally same as an exist node
+ usability.minusUsableRange(null, 120, 140);
+ System.out.println(usability);
+ Assert.assertTrue(usability.hasUnusableRange());
+ Assert.assertEquals("size:4,[MIN,60],[80,100],[120,140],[260,MAX],", usability.toString());
+
+ // re-insert: new range extend the both sides of an exist node
+ usability.minusUsableRange(null, 110, 150);
+ System.out.println(usability);
+ Assert.assertEquals("size:4,[MIN,60],[80,100],[110,150],[260,MAX],", usability.toString());
+
+ // an isolate range but the segmentation number reaches the upper bound, thus merge the range
+ // with a closer neighbor.
+ usability.minusUsableRange(null, 200, 220);
+ System.out.println(usability);
+ Assert.assertEquals("size:4,[MIN,60],[80,100],[110,150],[200,MAX],", usability.toString());
+
+ // a range covers several node.
+ usability.minusUsableRange(null, 50, 90);
+ System.out.println(usability);
+ Assert.assertEquals("size:3,[MIN,100],[110,150],[200,MAX],", usability.toString());
+
+ // a range covers several node.
+ usability.minusUsableRange(null, 105, 200);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,100],[105,MAX],", usability.toString());
+ Assert.assertTrue(usability.hasUnusableRange());
+ // the end
+ usability.minusUsableRange(null, 101, 107);
+ System.out.println(usability);
+ Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+ Assert.assertFalse(usability.hasUnusableRange());
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ usability.serialize(out);
+ InputStream in = new ByteArrayInputStream(out.toByteArray());
+ SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+ usability2.deserialize(in);
+ Assert.assertEquals("size:1,[MIN,MAX],", usability2.toString());
+ }
+
+ @Test
+ public void subMatchingToFilter() {
+ SubMatchIndexUsability a = new SubMatchIndexUsability(10, false);
+ Assert.assertFalse(a.hasUnusableRange());
+ a.addUsableRange(null, 31, 39);
+ a.addUsableRange(null, 61, 89);
+ System.out.println(a);
+
+ Assert.assertTrue(a.hasUnusableRange());
+ List<Filter> res = a.getUnusableRange();
+ Assert.assertEquals(3, res.size());
+ System.out.println(res);
+ Assert.assertEquals(
+ "[(time >= -9223372036854775808 && time <= 30), (time >= 40 && time <= 60), (time >= 90 && time <= 9223372036854775807)]",
+ res.toString());
+ }
+
+ @Test
+ public void minusUsableRange2() {
+ SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+ Assert.assertFalse(usability.hasUnusableRange());
+ usability.addUsableRange(null, 0, 15);
+ usability.minusUsableRange(null, 20, 24);
+ Assert.assertEquals("size:2,[MIN,-1],[16,MAX],", usability.toString());
+ Assert.assertTrue(usability.hasUnusableRange());
+ System.out.println(usability.toString());
+ }
+
+ @Test
+ public void reachUpBoundAndStopSplit() {
+ SubMatchIndexUsability usability = new SubMatchIndexUsability(2, false);
+ System.out.println(usability);
+ Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+
+ usability.addUsableRange(null, 5, 7);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,4],[8,MAX],", usability.toString());
+ usability.addUsableRange(null, 2, 3);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,4],[8,MAX],", usability.toString());
+ usability.minusUsableRange(null, 6, 6);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,4],[6,MAX],", usability.toString());
+ usability.addUsableRange(null, 6, 9);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,4],[10,MAX],", usability.toString());
+ usability.minusUsableRange(null, 6, 6);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,6],[10,MAX],", usability.toString());
+ usability.addUsableRange(null, 2, 6);
+ System.out.println(usability);
+ Assert.assertEquals("size:2,[MIN,1],[10,MAX],", usability.toString());
+ }
+
+ @Test
+ public void testBoundCase() {
+ SubMatchIndexUsability usability = new SubMatchIndexUsability(2, true);
+ usability.addUsableRange(null, Long.MIN_VALUE, Long.MAX_VALUE);
+ System.out.println(usability);
+ Assert.assertEquals(
+ "size:2,[MIN,-9223372036854775808],[9223372036854775807,MAX],", usability.toString());
+ usability.minusUsableRange(null, Long.MIN_VALUE, Long.MAX_VALUE);
+ System.out.println(usability);
+ Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+ Assert.assertTrue(usability.hasUnusableRange());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java b/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java
new file mode 100644
index 0000000..edf49fb
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+public class WholeMatchIndexUsabilityTest {
+
+ @Test
+ public void testMinusUsableRange() throws IllegalPathException, IOException {
+ WholeMatchIndexUsability usability = new WholeMatchIndexUsability();
+ // do nothing for addUsableRange
+ usability.addUsableRange(new PartialPath("root.sg.d.s10"), 1, 2);
+ usability.addUsableRange(new PartialPath("root.sg.d.s11"), 1, 2);
+
+ usability.minusUsableRange(new PartialPath("root.sg.d.s1"), 1, 2);
+ usability.minusUsableRange(new PartialPath("root.sg.d.s2"), 1, 2);
+ usability.minusUsableRange(new PartialPath("root.sg.d.s3"), 1, 2);
+ Set<PartialPath> ret = usability.getUnusableRange();
+ Assert.assertEquals("[root.sg.d.s3, root.sg.d.s2, root.sg.d.s1]", ret.toString());
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ usability.serialize(out);
+ InputStream in = new ByteArrayInputStream(out.toByteArray());
+ WholeMatchIndexUsability usable2 = new WholeMatchIndexUsability();
+ usable2.deserialize(in);
+ Set<PartialPath> ret2 = usability.getUnusableRange();
+ Assert.assertEquals("[root.sg.d.s3, root.sg.d.s2, root.sg.d.s1]", ret2.toString());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index ab7ddf3..dd36009 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.index.IndexManager;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -121,6 +122,11 @@ public class EnvironmentUtils {
fail();
}
+ // clean index
+ if (config.isEnableIndex()) {
+ IndexManager.getInstance().deleteAll();
+ }
+
IoTDBDescriptor.getInstance().getConfig().setReadOnly(false);
// clean cache
@@ -211,6 +217,8 @@ public class EnvironmentUtils {
cleanDir(config.getQueryDir());
// delete tracing
cleanDir(config.getTracingDir());
+ // delete index
+ cleanDir(config.getIndexRootFolder());
// delete data files
for (String dataDir : config.getDataDirs()) {
cleanDir(dataDir);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f2fd3c1..a90dcad 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -65,6 +65,7 @@ public enum TSStatusCode {
UNSUPPORTED_INDEX_FUNC_ERROR(421),
UNSUPPORTED_INDEX_TYPE_ERROR(422),
+ INDEX_QUERY_ERROR(423),
INTERNAL_SERVER_ERROR(500),
CLOSE_OPERATION_ERROR(501),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 014d45a..02f1d16 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -128,6 +128,30 @@ public class ReadWriteIOUtils {
}
}
+ public static int write(Map<String, String> map, OutputStream stream) throws IOException {
+ int length = 0;
+ write(map.size(), stream);
+ length += 4;
+ for (Entry<String, String> entry : map.entrySet()) {
+ length += write(entry.getKey(), stream);
+ length += write(entry.getValue(), stream);
+ }
+ return length;
+ }
+
+ public static Map<String, String> readMap(InputStream inputStream) throws IOException {
+ int length = readInt(inputStream);
+ Map<String, String> map = new HashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ // key
+ String key = readString(inputStream);
+ // value
+ String value = readString(inputStream);
+ map.put(key, value);
+ }
+ return map;
+ }
+
public static int write(Map<String, String> map, ByteBuffer buffer) {
int length = 0;
byte[] bytes;
@@ -701,6 +725,29 @@ public class ReadWriteIOUtils {
return bytes;
}
+ /**
+ * read bytes from an inputStream where the length is specified at the head of the inputStream.
+ * Make sure {@code inputStream} contains an integer numeric (the first 4 bytes) indicating the
+ * length of the following data.
+ *
+ * @param inputStream contains a length and a stream
+ * @return bytebuffer
+ * @throws IOException if the read length doesn't equal to the self description length.
+ */
+ public static ByteBuffer readByteBufferWithSelfDescriptionLength(InputStream inputStream)
+ throws IOException {
+ int length = readInt(inputStream);
+ byte[] bytes = new byte[length];
+ int readLen = inputStream.read(bytes);
+ if (readLen != length) {
+ throw new IOException(String.format(RETURN_ERROR, length, readLen));
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(length);
+ byteBuffer.put(bytes);
+ byteBuffer.flip();
+ return byteBuffer;
+ }
+
/** read bytes from buffer with offset position to the end of buffer. */
public static int readAsPossible(TsFileInput input, long position, ByteBuffer buffer)
throws IOException {