You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2021/03/02 11:01:50 UTC

[iotdb] 01/01: Index framework: part 2

This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch index_manager_part2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b0639d57bc4e6901fd482e36c0a3a71c7e2387e9
Author: kr11 <3095717866.com>
AuthorDate: Tue Mar 2 19:00:04 2021 +0800

    Index framework: part 2
---
 docs/zh/SystemDesign/Index/Index.md                | 672 +++++++++++++++++++++
 .../resources/conf/iotdb-engine.properties         |   6 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  33 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  12 +
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  34 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   2 +-
 .../db/exception/index/QueryIndexException.java    |   5 +
 ...ion.java => UnsupportedIndexFuncException.java} |  10 +-
 .../iotdb/db/index/IndexBuildTaskPoolManager.java  |  78 +++
 .../org/apache/iotdb/db/index/IndexManager.java    | 329 ++++++++++
 .../apache/iotdb/db/index/IndexManagerMBean.java   |  21 +
 .../iotdb/db/index/IndexMemTableFlushTask.java     |  75 +++
 .../org/apache/iotdb/db/index/IndexProcessor.java  | 514 ++++++++++++++++
 .../iotdb/db/index/algorithm/IoTDBIndex.java       | 182 ++++++
 .../apache/iotdb/db/index/algorithm/NoIndex.java   |  68 +++
 .../apache/iotdb/db/index/common/DistSeries.java   |  39 ++
 .../iotdb/db/index/common/IndexConstant.java       |  54 ++
 .../apache/iotdb/db/index/common/IndexInfo.java    | 137 +++++
 .../db/index/common/IndexProcessorStruct.java      |  52 ++
 .../apache/iotdb/db/index/common/IndexType.java    |  56 +-
 .../apache/iotdb/db/index/common/IndexUtils.java   |  75 +++
 .../common/func/CreateIndexProcessorFunc.java      |  39 ++
 .../iotdb/db/index/common/func/IndexNaiveFunc.java |  34 ++
 .../iotdb/db/index/common/math/Randomwalk.java     |  44 ++
 .../common/math/probability/Probability.java}      |  12 +-
 .../common/math/probability/UniformProba.java}     |  24 +-
 .../db/index/feature/IndexFeatureExtractor.java    | 106 ++++
 .../iotdb/db/index/read/IndexQueryDataSet.java     |  41 ++
 .../iotdb/db/index/read/QueryIndexExecutor.java    |  56 ++
 .../optimize/IIndexCandidateOrderOptimize.java     |  38 ++
 .../read/optimize/NoCandidateOrderOptimizer.java   |  21 +
 .../apache/iotdb/db/index/router/IIndexRouter.java | 129 ++++
 .../iotdb/db/index/router/ProtoIndexRouter.java    | 424 +++++++++++++
 .../apache/iotdb/db/index/usable/IIndexUsable.java |  98 +++
 .../db/index/usable/SubMatchIndexUsability.java    | 311 ++++++++++
 .../db/index/usable/WholeMatchIndexUsability.java  |  79 +++
 .../org/apache/iotdb/db/metadata/PartialPath.java  |  18 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  43 +-
 .../iotdb/db/qp/physical/crud/QueryIndexPlan.java  |   9 +
 .../iotdb/db/query/executor/IQueryRouter.java      |  12 +
 .../iotdb/db/query/executor/QueryRouter.java       |  12 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   5 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  13 +
 .../iotdb/db/utils/datastructure/TVList.java       |  29 +
 .../writelog/recover/TsFileRecoverPerformer.java   |   3 +-
 .../db/engine/memtable/MemTableFlushTaskTest.java  |   3 +-
 .../apache/iotdb/db/index/it/NoIndexWriteIT.java   | 162 +++++
 .../db/index/router/ProtoIndexRouterTest.java      | 179 ++++++
 .../index/usable/SubMatchIndexUsabilityTest.java   | 233 +++++++
 .../index/usable/WholeMatchIndexUsabilityTest.java |  55 ++
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   8 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  47 ++
 54 files changed, 4700 insertions(+), 44 deletions(-)

diff --git a/docs/zh/SystemDesign/Index/Index.md b/docs/zh/SystemDesign/Index/Index.md
new file mode 100644
index 0000000..cf0fba9
--- /dev/null
+++ b/docs/zh/SystemDesign/Index/Index.md
@@ -0,0 +1,672 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# 相似性索引框架文档
+
+[TOC]
+
+## 第1章 任务目标
+
+### 背景
+
+在最近的二十年间,谷歌一系列重要工作的发表 (GFS ,BigTable ,Map-Reduce 等),以及相应的开源框架的兴起 (Hadoop,HBase 等) 推动了大数据技术的兴起和蓬勃发展。以BigTable,HBase,Cassandra 为代表的NoSQL数据库系统对新一代数据库技术产生了巨大的影响。在这一背景下,为了管理气象、医疗、工业物联网等领域产生的大量时间序列数据,时序数据库系统也得到了快速发展 。
+
+然而,现有数据库系统通常仅支持经典查询(点查询,范围查询和带值过滤条件的查询),常见的聚合查询(最大值,最小值,均值,求和等)和少量领域相关操作(如按时间间隔聚合,按时间戳对齐等)等等。
+在传统的关系型数据库系统中,如Oracle和MySQL,往往只集成了几种被广泛使用的索引技术,如用于加速范围查询的B树和加速等值查询的哈希表。很多当前主流的时序数据库系统(如InfluxDB,OpenTSDB、Apache IoTDB)仍不支持时间序列的相似性索引。InfluxDB提出了时间序列索引(Time Series Index,TSI)。但主要用于对海量时间序列的路径、标签等元信息的快速查找。在机器学习领域,开发者提出了许多索引代码库,但主要面向近似检索,同时并未与时序数据库系统进行集成。基于时序数据库系统的索引机制仍然是有待解决的问题。
+
+针对时序数据的相似性索引一直是非常重要的研究课题。研究者们提出了一系列索引结构,致力于解决全序列检索和子序列检索问题。然而,研究者提出的索引技术通常以独立项目的形式存在。在应用于生产场景时,需要全面考虑数据采集/存储、索引构建/查询接口、数据更新、数据容灾等问题。时序数据库系统具有规范化的查询语言、丰富而健壮的读写接口、高效的数据读写性能以及较为完善的容灾策略。将相似性索引技术集成到时序数据库系统中,不仅丰富了数据库系统的功能,同时也会简化索引在面对复杂应用场景时的适配工作,提升索引的易用性,有助于索引技术在生产实践中落地。然而,数据库系统是复杂的,与数据库系统的对接必须综合考虑SQL定义、执行计划、读写流程以及返回格式定义等技术细节。这些复杂的问题制约了索引与�
 �据库系统的深度集成。
+
+
+
+#### 目标
+
+使IoTDB支持时间序列相似性检索功能,并集成包括复合模式索引、R树索引在内的多种索引技术。
+
+## 详细设计
+
+### 基本概念
+
+#### 什么是“相似性检索”
+
+相似性搜索是时间序列领域中最重要的方向之一。与IoTDB中现有的查询条件不同,相似性搜索输入一条“序列”作为查询,目的是高效地从数据库中找到一系列与“查询序列”相似的其他。
+
+所谓“相似”:两条序列之间的“相似性”取决于它们的“欧氏距离”或其他距离函数。
+
+如下图形象地展示了如何用“欧氏距离(Euclidean)”和“动态时间规整距离(DTW)”度量两条时间序列的相似性:
+
+![image-20210225015048345](https://user-images.githubusercontent.com/5548915/109539640-849a8b00-7afc-11eb-8c95-b666d6dac2ca.png)
+
+
+
+#### 什么是“特征索引”
+
+在关系型数据库中,一些经典的索引结构,如B树和哈希表,都是对关系表中的属性创建索引。而相似性索引的检索对象是高维序列。由于高维序列存在维度灾难问题,相似性索引需要首先将高维序列转换成低维特征,然后对低维特征创建索引。如果将关系型数据库系统中的B树和哈希表称为“基于属性的索引”,那么时序数据库系统中的相似性索引可以被视为“基于特征的索引”。
+
+数据库系统中的特征索引从逻辑上可以被划分为三层结构,自下而上依次是数据层、特征层和索引层:
+
+ * 数据层:对应着数据空间中的原始序列。 在IoTDB中,系统接收大量传感器数据并将其组织成TsFile文件格式写入磁盘;
+ * 特征层:对应着特征空间中的低维特征。 每条特征的大小通常远小于原始时间序列。 一条特征记录包含两部分信息:特征向量和指向数据文件的指针;
+ * 索引层:对应着索引结构。通过额外的内存或磁盘结构,索引结构可以对特征进行有效的组织。索引结构的大小通常远小于由它组织的特征和原始序列。在索引结构底层的索引项中包含了一个或多个指向特征的指针;
+
+在构造索引时,索引机制首先将数据层中的原始序列转换为特征,然后写入索引结构中。在执行查询时,索引机制将查询序列转换为特征,然后输入索引结构。通过索引结构和特征信息对被检索集进行剪枝,得到候选集,候选集中包含了一系列指向原始数据的指针;然后访问候选集所对应的原始数据,得到最终的查询结果。
+
+###  功能定义
+
+索引框架将满足两类使用者的需求:
+
+* 对数据库管理员和用户来说,索引机制设计了**索引创建、查询和删除的SQL语句**,并设计了与IoTDB其他查询相一致的返回结果格式。
+* 对索引实现者来说,索引机制暴露出一套与索引技术密切相关的接口,包括特征选型、索引插入和索引过滤等等。索引实现者可以在不必关心数据库系统技术细节的情况下,便捷地集成各种索引技术。
+
+#### 用户功能:面向用户和DBA
+
+> 注:面向用户和DBA的代码主要涉及SQL和查询计划的改动,这部分代码已经合并到主分支中,参见[PR-2024(IoTDB-804)](https://github.com/apache/iotdb/pull/2024)。
+
+本节将从两个工业场景中的示例出发,阐述两种检索场景(全序列检索和子序列检索)的使用方式。
+
+##### 全序列检索
+
+![image](https://user-images.githubusercontent.com/5548915/109540349-4782c880-7afd-11eb-9c67-ae83697f77d3.png)
+
+上图展示了制药工厂在IoTDB中的数据库模式。为了简洁清晰,图中省略了部分与本节无关的信息。工厂包含多个发酵罐(Ferm01、Ferm02等等),每个发酵罐逐批次地进行红霉素发酵工作,每个批次持续时间为7天。对于每个批次(如Ferm02发酵罐的20191017批次),工厂会对发酵过程中的物理量进行记录,包括葡萄糖补糖率(Glu)、二氧化碳释放率(CER)、酸碱度(pH)等等。
+
+分析人员希望为所有发酵罐、所有批次的补糖率序列(Glu)创建基于“PAA特征的R树索引”(记为RTREE_PAA)。创建索引格式如下:
+
+```sql
+CREATE INDEX ON seriesPath
+WITH INDEX=indexName (, key=value)* 
+```
+
+除了必须指定索引名之外,还需要传入一系列创建索引所需的参数。在本案例中,创建索引语句为:
+
+```sql
+CREATE INDEX ON root.Ery.*.Glu 
+WITH INDEX=RTREE_PAA, PAA_DIM=8
+```
+
+索引类别为RTREE_PAA,即基于PAA特征的R树索引:将时间序列转换为8维PAA特征,然后交由R树组织。
+
+创建索引语句不限制索引参数的数量和格式,以此保证良好的可拓展性。用户提交的参数将由索引注册表记录。当索引实例被加载到内存中时,索引机制会将参数信息传给索引实例。
+
+分析人员希望在所有发酵罐的所有批次中,寻找与某种“补糖策略”最接近的2条补糖率曲线,查询语句如下:
+
+```sql
+SELECT TOP 2 Glu 
+FROM root.Ery.* 
+WHERE Glu LIKE (0, 120, 20, 80, ..., 120, 100, 80, 0)
+```
+
+通过关键字LIKE,IoTDB的查询处理器将这条查询语句交给索引机制执行。
+为了与IoTDB的经典查询结果格式保持一致,全序列检索的每一条结果单独成列,并按照时间戳顺序对齐。如下图所示,与给定查询序列最接近的两条序列分别是发酵罐Ferm01的20191018批次和发酵罐Ferm02的20191024批次。两条序列按照时间戳对齐。
+
+![image](https://user-images.githubusercontent.com/5548915/109540059-f70b6b00-7afc-11eb-938e-d921a9a5d667.png)
+
+索引机制允许用户删除一个被创建的索引,删除索引格式如下:
+
+```sql
+DROP INDEX indexName ON seriesPath
+```
+
+删除索引的格式非常简单。在本例中,分析人员通过以下语句删除索引:
+
+```sql
+DROP INDEX RTREE_PAA ON root.Ery.*.Glu 
+```
+
+##### 子序列检索
+
+![image](https://user-images.githubusercontent.com/5548915/109540439-65502d80-7afd-11eb-87d6-cf9b7e46ecfd.png)
+
+上图展示了风力发电厂在IoTDB中的数据库模式。风电厂包含多个风机(AZQ01、AZQ02等等),每个风机对风机状态和周围环境进行持续监测。监测传感器包括风速(Speed)、风向(Direction)、风机功率(Power)等等。
+
+分析人员希望为风机AZQ02的风速传感器序列(Speed)创建“等长数据块索引”。创建索引语句为:
+
+```sql
+CREATE INDEX ON root.Wind.AZQ02.Speed
+WITH INDEX=ELB_INDEX, Block_Size=5
+```
+
+创建索引后,分析人员希望在AZQ02风机的风速序列中寻找与"极端运行阵风"相似的子序列。其中极端运行阵风为复合模式,在风速高峰期阈值要求小于4,在其他区间段阈值要求小于2。查询语句如下:
+
+```sql
+SELECT Speed.* FROM root.Wind.AZQ02
+WHERE Speed 
+CONTAIN (15, 14, 12, ..., 12, 12, 11) WITH TOLERANCE 2
+CONCAT  (10, 20, 25, ..., 24, 14, 8) WITH TOLERANCE 4
+CONCAT  (8, 9, 10, ..., 14, 15, 15) WITH TOLERANCE 2
+```
+
+通过 CONTAIN 和 WITH TOLERANCE等关键字,IoTDB的查询处理器将这条查询语句交给索引机制执行,索引机制进一步调用相应的ELB索引。
+
+子序列检索的结果是长序列上的子片段,原本并不存在于数据库模式中。为了与IoTDB的其他查询结果格式保持一致,每一个检索结果单独成列,列名由“传感器序列名”和“子片段起始时间”连缀而成。如下图所示,复合模式的检索结果共两条,分别从时刻2019-10-18 12:30:00.000和2019-10-18 12:30:10.000开始。
+
+![image](https://user-images.githubusercontent.com/5548915/109539955-db07c980-7afc-11eb-8848-41983d393f45.png)
+
+删除索引操作与全序列检索场景类似,不再赘述。
+
+#### 用户功能:面向索引实现者
+
+索引机制致力于屏蔽数据库系统复杂的运行逻辑和消息处理,为开发者提供简洁而友好的索引集成平台。
+
+如果想为IoTDB添加一种新的索引技术,索引实现者需要继承`IoTDBIndex` 类,并在`IndexType` 这一枚举类中添加新的索引类型(“以硬编码的方式向枚举类中添加索引”仍然不够优雅,我们会在未来的版本中用其他方式取代)。
+
+##### IoTDBIndex
+
+`IoTDBIndex`是一个抽象类,一些关键的属性和抽象方法如下:
+
+```java
+// 本索引的特征处理器。所有特征处理器都继承自IndexFeatureExtractor类
+protected IndexFeatureExtractor indexFeatureExtractor;
+
+/**
+  为本索引使用的特征提取器进行初始化操作
+  previous:如果特征处理器在上一次系统关闭时存储了一些状态信息,这些状态信息会以previous参数传入
+  inQueryMode:用于索引写入还是索引查询。索引可以在两种场景中使用不同的特征提取器。
+  */
+public abstract void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode);
+
+// 从特征提取器中获取最新产生的序列和特征信息,写入索引。
+public abstract boolean buildNext() throws IndexManagerException;
+
+// 系统关闭时,将内存中的索引数据序列化到磁盘
+protected abstract void flushIndex();
+
+// 执行索引查询。
+public abstract QueryDataSet query(
+      Map<String, Object> queryProps,
+      IIndexUsable iIndexUsable,
+      QueryContext context,
+      IIndexRefinePhaseOptimize refinePhaseOptimizer,
+      boolean alignedByTime)
+      throws QueryIndexException;
+
+```
+
+##### IndexType
+
+每种索引都有自己的唯一类型`IndexType`。当索引实现者希望添加一种索引时,需要在`IndexType`中增加一种类型,并为以下函数添加一个新分支:
+
+```java
+public static IndexType deserialize(short i);
+
+public short serialize();
+
+private static IoTDBIndex newIndexByType(PartialPath path, TSDataType tsDataType, String indexDir, IndexType indexType, IndexInfo indexInfo);
+```
+
+##### IndexFeatureExtractor
+
+> 在索引框架的第二次PR中暂不涉及IndexFeatureExtractor的具体实现。
+
+在全序列特征提取器中,特征提取器接一条原始数据作为输入,从中提取出三种信息,以应对索引生成和查询过程中存在的各种需求。三种信息从简单到复杂分别为:
+
+1. L1-序列指针:包含三元组信息:序列路径、起始时间戳和结束时间戳。
+2. L2-规整序列:将不定频、有缺失数据的序列转换为维度固定或时间间隔整齐的序列,供索引处理。
+3. L3-序列特征:基于规整序列进一步计算得到的特征。格式不固定。
+
+以上三种信息分别在索引读写过程中起到不同的作用。序列指针是指向原始数据的指针。
+由于IoTDB会在形成TsFile文件时对原始数据进行编码和压缩,因此无法简单地通过"文件路径+偏移量"来获取原始数据。取而代之的是,根据序列名和起止时间这三元组信息来唯一确定一段原始数据。序列指针会在特征文件和候选集中被使用;
+规整序列是多数索引可以接受的输入形式。在实验条件下,索引结构可以认为时序数据维度固定、时间戳等间隔且没有缺失值。但在实际情况中,序列可能存在微小的时间戳偏移或少量的数据错漏,规整序列对原始序列进行了预处理,去除了上述数据缺陷。
+序列特征是索引对原始序列进行特征提取后的结果。特征提取方法往往是索引技术的关键创新所在。
+
+如果索引实现者希望自定义实现特征提取器 `IndexFeatureExtractor` ,需要实现以下接口:
+
+```java
+// 加入一段原始数据以备处理
+public abstract void appendNewSrcData(TVList newData);
+
+// 加入一段原始数据以备处理,由于代码历史原因,IoTDB在数据写入时生成TVList,而查询时却会得到BatchData
+public abstract void appendNewSrcData(BatchData newData);
+
+// 索引框架会首先询问是否有下一条数据
+public abstract boolean hasNext();
+
+// 如果有,调用本方法产生一个待插入数据。注意,在appendNewSrcData中插入一段原始数据后,可能产生多条待插入数据
+public abstract void processNext();
+
+// 处理了一批数据后,调用本方法来释放已经处理过的数据的资源
+public abstract void clearProcessedSrcData();
+
+// 关闭特征处理器,返回状态数据,用于下一次打开时恢复状态
+public abstract ByteBuffer closeAndRelease() throws IOException;
+
+// 返回 L2-规整数据
+public Object getCurrent_L2_AlignedSequence();
+
+// 返回 L3-特征
+public Object getCurrent_L3_Feature();
+```
+
+
+
+#### 系统参数配置
+
+系统参数如下:
+
+| 参数名                                | 类型   | 默认值      | 说明                                                 |
+| ------------------------------------- | ------ | ----------- | ---------------------------------------------------- |
+| enable_index                          | bool   | false       | 是否开启索引                                         |
+| index_root_dir                        | string | data/index  | 所有索引文件的总目录                                 |
+| concurrent_index_build_thread         | int    | 系统CPU核数 | 执行索引写入时的最大并行线程数量                     |
+| default_index_window_range            | int    | 10          | 目前未用到,拟删除                                   |
+| index_buffer_size                     | long   | 134217728   | 目前未用到,拟删除                                   |
+| max_index_query_result_size           | int    | 5           | 索引检索结果的最大返回数量                           |
+| default_max_size_of_unusable_segments | int    | 20          | 可用区间管理器的最大段数,见`SubMatchIndexUsability` |
+
+### 模块设计
+
+下图展示了IoTDB索引机制的模块设计图。索引机制会与IoTDB中的多个模块(绿色框图)产生消息和数据交互(绿色箭头)。例如,在IoTDB系统启动和关闭时会启动或关闭索引机制;当某条序列有新的数据点写入时,IoTDB存储管理器会调用索引机制采取相应操作;当用户发起索引查询时,IoTDB查询处理器会将查询条件转交给索引机制,获取索引查询结果并返回给用户。
+
+![image-20210227025206792](https://user-images.githubusercontent.com/5548915/109539696-92501080-7afc-11eb-8621-a7d4cabcd94c.png)
+
+索引机制内部的各个模块(蓝色框图)也会产生消息和数据交互。索引实现者可以继承指定接口为索引机制添加新的索引实例。通过这些接口,索引机制会调用索引实例完成索引写入和查询等功能(蓝色箭头)。
+
+下面分别介绍各个模块的功能和接口。
+
+#### 特征提取器
+
+对应代码类:`IndexFeatureExtractor` 及其子类。
+
+特征提取器负责对IoTDB的原始序列进行预处理和特征提取。在索引写入阶段,对于全序列检索,特征提取器对整条序列进行特征提取;对于子序列检索,特征提取器首先利用滑动窗口模型将长序列截取为短片段,然后提取特征。特征方法和相关参数在创建索引的时候指定;在索引查询阶段,特征提取器为查询序列提取特征。由于索引技术细节的不同,查询阶段的特征方法可能与写入阶段的特征方法相同或不同。
+系统已经内置了几种较为常见的特征提取器。索引实现者可以直接选用内置的特征提取器,或者实现自定义的新特征提取器。特征提取器的接口详见2.2.2.3节:IndexFeatureExtractor。
+
+#### 索引调度器
+
+首先介绍“索引序列”概念:"索引序列"是一个索引实例覆盖的"序列"或"序列集合"。将索引序列或索引序列集合的所有数据会写入同一个索引实例中。例如,在全序列索引的例子中,`root.Ery.*.Glu ` 就是这个索引的“索引序列”,而在子序列索引的例子中,`root.Wind.AZQ02.Speed` 就是ELB索引的“索引序列”。
+
+每个索引在创建的时候会指定它的“索引序列”,索引序列可能包含一条或多条时间序列。两个“索引序列”之间可能有交集。一个索引序列上可能会创建多种索引。
+
+索引、索引序列和时间序列的ER图是:
+
+![image](https://user-images.githubusercontent.com/5548915/109540557-9a5c8000-7afd-11eb-9be3-ede027e81bb2.png)
+
+每个索引序列上可能会创建多种索引实例,索引框架中的每个索引序列对应一个“实力调度器”(IndexProcessor),负责调度该索引序列下的所有索引实例。
+
+总调度器(IndexManager)、实例调度器(IndexProcessor)和索引实例(Index)的ER图是:
+
+![image](https://user-images.githubusercontent.com/5548915/109540566-9c264380-7afd-11eb-92ef-c9bcc019f4f0.png)
+
+
+
+**总调度器**:对应代码类:`IndexManager` 
+
+IndexManager是索引机制的全局管理者,IoTDB接到的索引创建、删除、查询以及写入均会调用IndexManager。IndexManager会将索引操作分发给相应的IndexProcessor执行。
+
+最初的想法是,"剪枝阶段"由索引实例得到候选集,而"精化阶段"完全由索引框架完成(即对候选集所对应的原始数据进行遍历)。这样的方案是足够通用的。然而,索引技术有各种优化,强制执行上述策略会影响索引的集成。例如,"剪枝阶段"也可能要访问原始数据(RTree),精化阶段也有特定的优化(ELB-Index)。由于已经实现的这两种索引技术均有各自的优化,因此,当前的查询接口将整个查询过程全部交给索引实例完成。
+
+接口如下:
+
+```java
+// 创建索引,目前indexSeriesList中仅包含单条路径。
+public void createIndex(List<PartialPath> indexSeriesList, IndexInfo indexInfo);
+
+// 删除索引
+public void dropIndex(List<PartialPath> indexSeriesList, IndexType indexType);
+
+// 索引查询。
+public QueryDataSet queryIndex(List<PartialPath> paths, IndexType indexType,
+      Map<String, Object> queryProps, QueryContext context, boolean alignedByTime)
+
+// 关闭操作
+private synchronized void close();
+
+```
+
+
+
+**实例调度器**:对应代码类:` IndexProcessor`
+
+每个IndexProcessor对应一个索引序列,负责管理一个索引序列下的所有索引实例操作。
+
+属性如下:
+
+```java
+/**
+每个索引实例在其关闭时会生成一段状态信息(previousData),用于下一次加载时恢复到当前状态。每个 IndexProcessor 的所有 previousData 会被存储于同一个文件。索引可以管理和存储自身的信息,因此 previousData 不是必须的。但由于特征处理器也会产生 previousData,如果索引不想管理这部分信息,则可以将其抛给 IndexProcessor 管理
+*/
+private final Map<IndexType, ByteBuffer> previousDataBufferMap;
+
+// 每个索引实例对应一个可用区间管理器(IIndexUsable),每个IndexProcessor的所有IIndexUsable会被存储于同一个文件
+private Map<IndexType, IIndexUsable> usableMap;
+
+// 精化阶段(精化阶段)的优化器。目前并未用到。未来设计详见{@link IIndexCandidateOrderOptimize}.
+private final IIndexCandidateOrderOptimize refinePhaseOptimizer;
+```
+
+接口如下:
+
+```java
+// 将已排序的序列写入所有索引中
+public void buildIndexForOneSeries(PartialPath path, TVList tvList);
+
+// 需要等待索引全部刷写完才会返回
+public void endFlushMemTable();
+
+// 将除了NoIndex之外的索引实例刷出到磁盘
+public synchronized void close(boolean deleteFiles) throws IOException;
+
+/**
+根据传入的 indexInfoMap 刷新当前IndexProcessor中的索引实例。当用户进行创建或删除索引,则IndexProcessor中维护的索引实例就过期了,因此需要刷新。
+*/
+public void refreshSeriesIndexMapFromMManager(Map<IndexType, IndexInfo> indexInfoMap);
+
+// 对于乱序数据,目前的设计中直接将数据标记为"索引不可用"
+void updateUnsequenceData(PartialPath path, TVList tvList);
+```
+
+
+
+**索引刷写任务器**,对应`IndexMemTableFlushTask`。
+
+在当前的设计中,索引不会每次有新数据点到来就实时更新,而是在IoTDB的flush时批量写入。当一个存储组执行flush任务时会创建一个flush线程`MemTableFlushTask`。如果系统索引功能开启(enableIndex==true), `MemTableFlushTask` 会将MemTable中的时间序列交给IndexManager写入。考虑到系统代码解耦,在flush过程中的所有索引相关操作封装在一个``IndexMemTableFlushTask`` 中。
+
+不直接写入IndexManager,而是要向IndexManager申请一个``IndexMemTableFlushTask``对象的原因是,IoTDB的存储组的flush任务是独立而并行的,但IndexManager是全局单例的。同时,IoTDB的不同存储组所创建的索引也可能互相不覆盖(即对应不同的IndexProcessor,互相可能并没有并行冲突),因此,每个存储组在flush的时候,向IndexManager申请独立的`IndexMemTableFlushTask` 可以提升并行效率。
+
+接口如下:
+
+```java
+// 插入排序后的序列
+public void buildIndexForOneSeries(PartialPath path, TVList tvList);
+
+// 等待所有IndexProcessor写入完成后再返回
+public void endFlush();
+```
+
+
+
+### 特征文件管理器、索引文件管理器
+
+特征文件管理器指定特征文件的存放位置,索引文件管理器指定索引内存结构刷写到磁盘的文件位置。这两个单元的功能都蕴含在索引调度器中。
+
+### 注册表
+
+对应`IIndexRouter` 及其子类。
+
+`IIndexRouter`的第一个功能是管理索引实例的元数据。当新的索引被创建或删除后,注册表会同步更新。
+
+`IIndexRouter`的第二个功能是将索引操作命令高效地传递到相应的`IndexProcessor`。考虑到`IoTDBIndex`、`IndexProcessor`的映射关系较为复杂且可能在未来的设计中有所改动,`IIndexRouter` 可以将这些映射工作与`IndexManager` 解锁。
+
+接口如下:
+
+```java
+// 添加索引
+boolean addIndexIntoRouter(PartialPath prefixPath, IndexInfo indexInfo, CreateIndexProcessorFunc func, boolean doSerialize) throws MetadataException;
+
+// 删除索引
+boolean removeIndexFromRouter(PartialPath prefixPath, IndexType indexType);
+
+// 返回给定 indexSeries 下面的索引信息
+Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries);
+
+// 获取所有 IndexProcessor 及其信息
+Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo();
+
+// 返回给定 timeSeries 所属的所有 IndexProcessors
+Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries);
+
+// 序列化
+void serialize(boolean doClose);
+
+// 反序列化
+void deserializeAndReload(CreateIndexProcessorFunc func);
+
+// 返回一个存储组相关的 IIndexRouter 子集。这是为了提高 IIndexRouter 访问的并行性
+IIndexRouter getRouterByStorageGroup(String storageGroupPath);
+
+// 开始查询
+IndexProcessorStruct startQueryAndCheck(
+      PartialPath partialPath, IndexType indexType, QueryContext context)
+      throws QueryIndexException;
+
+// 结束查询
+void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context);
+```
+
+**ProtoIndexRouter** :是 `IIndexRouter` 的一种简单实现。
+
+子序列索引针对单一序列创建,而全序列索引针对有通配符的一组序列创建,因此 `ProtoIndexRouter` 对两种场景用不同的Map结构管理。
+
+```java
+// 子序列索引,索引序列为全路径
+private Map<String, IndexProcessorStruct> fullPathProcessorMap;
+
+// 全序列索引,索引序列包含通配符
+private Map<PartialPath, IndexProcessorStruct>` wildCardProcessorMap
+```
+
+`IIndexRouter` 的关键功能是为一个序列快速找到其所属的`IndexProcessor`。如果该序列是全路径的,则可以在`fullPathProcessorMap` 中以 $O(1)$ 找到;否则,必须遍历 `wildCardProcessorMap` 中的每一个包含通配符的路径。
+
+
+
+### 可用区间管理器
+
+可用区间管理器用于处理乱序数据。
+大多数时间序列数据点会按照数据时间戳顺序到来,但当乱序数据到来,或者用户手动更新了某段数据后,这一段数据上的索引正确性无法保证。
+对于这些数据,可用区间管理器将其标记为索引不可用。不可用区间的序列将被加入候选集中,接受"精化阶段"的检验。
+
+接口如下:
+
+```java
+// 增加可用区间
+void addUsableRange(PartialPath fullPath, long start, long end);
+
+// 增加不可用区间
+void minusUsableRange(PartialPath fullPath, long start, long end);
+
+// 获取不可用区间,注意,全序列索引和子序列索引的返回格式不同
+Object getUnusableRange();
+
+// 序列化
+void serialize(OutputStream outputStream) throws IOException;
+
+// 反序列化
+void deserialize(InputStream inputStream) throws IllegalPathException, IOException;
+```
+
+当前版本针对全序列索引和子序列索引,分别实现了一种可用区间管理器。
+
+**SubMatchIndexUsability**:针对子序列索引的可用区间管理器。
+
+子序列索引的“索引序列”为单一序列,不包含通配符。如果这条长序列上的某一子片段被更改,则将这一片段标记为“索引不可用”,剩余片段则仍为“索引可用”区间。
+
+当前的实现采用一个链表来标记索引不可用的片段。每个链表的节点为 `RangeNode` ,代表一段不可用的时间段。
+
+```java
+class RangeNode {
+    long start;
+    long end;
+    RangeNode next;
+}
+```
+
+`SubMatchIndexUsability` 的构造函数有两个:
+
+```java
+/**
+ * 构造函数
+ * @param maxSizeOfUsableSegments 不可用区间的最大段数
+ * @param initAllUsable 如果为true,最初时间段均为“索引可用”,如果为false,最初均为“索引不可用”
+ */
+SubMatchIndexUsability(int maxSizeOfUsableSegments, boolean initAllUsable) {
+  ...
+}
+```
+
+当前实现中对于 `RangeNode` 链表的访问和更新是线性遍历的,然而用户可以指定不可用区间的最大段数 $M$ ,因此链表的访问和更新复杂度可以控制在 `O(M)` ,即常数量级。当不可用区间的段数增加到上限时,会将新的不可用区间与较近的区间合并,从而控制区间段数。这样会使得“不可用区间”范围扩大,但将“可用区间”标记为“不可用区间”仅会让一些“被剪枝”的序列进入精化阶段,而不会造成漏解(即,将本来正确的结果错误地排除掉),因此 $M$ 参数并不会影响索引查询的正确性。
+
+`getUnusableRange()` 函数返回一系列不可用区间的时间过滤器:`List<Filter>` 。
+
+例如,在序列 `root.Wind.AZQ02.Speed` 上创建子序列索引。
+
+1.  `initAllUsable = false,最初 整个区间段均为“索引不可用”;`maxSizeOfUsableSegments=2` 最多允许两个索引不可用片段存在;
+2. 增加可用片段 $[5,7]$ ,不可用区间变为  $[MIN,4] \cup [8,MAX]$ ;
+3. 增加可用片段 $[2,3]$ ,不可用区间应当分裂为  $[1,2] \cup [4,4]\cup [8,10]$ ,然而不可用区间超过了上限,因此停止分裂,仍然保持  $[MIN,4] \cup [8,MAX]$ 
+
+**WholeMatchIndexUsability**:针对全序列索引的可用区间管理器。
+
+全序列索引的“索引序列”包含通配符,每一条符合规则的时间序列作为一个整体插入索引。因此,对于一条序列的任何更改都会使整条序列变为“索引不可用”。因此,本类采用一个集合 `unusableSeriesSet` 来标记索引不可用的序列。
+
+例如,在索引序列 `root.Ery.*.Glu ` 之上创建全序列索引。当某条序列 `root.Ery.01.Glu` 被更改,则将其加入集合 `unusableSeriesSet` ,该条序列被标记为“索引不可用”。
+
+`getUnusableRange()` 函数返回被标记为“索引不可用”的序列的集合 `Set<PartialPath>` 。
+
+### 索引查询处理器
+
+查询处理模块负责执行索引相关的查询。包括索引查询执行器、查询优化器,在查询时也会用到可用区间管理器。
+
+**查询执行器**:对应类`QueryIndexExecutor` 。
+
+尽管我们可以从相似性索引的查询过程中提取共性,将其分为“剪枝阶段”和“精化阶段”,并且精化阶段可以由索引框架结果。然而,许多索引技术(包括当前实现的两种索引:RTree和ELB索引)会对“剪枝阶段”和“精化阶段”结合起来优化,强制接管“精化阶段”可能会影响索引查询效率。因此,现阶段的查询处理器直接调用`IndexManager#queryIndex` 函数。
+
+**查询结果类**:对应类 `IndexQueryDataSet` 继承自 `ListDataSet`。
+
+
+
+### 索引接口
+
+对应`IoTDBIndex` 。
+
+索引接口是索引实现者唯一需要实现的部分,接口包括指定特征提取器、写入索引、序列化索引、索引查询等等,详细阐述参见2.2.2.1节。
+
+## 流程设计
+
+### 索引写入流程
+
+![image](https://user-images.githubusercontent.com/5548915/109540629-b6602180-7afd-11eb-8dbe-e38c91921a7e.png)
+
+
+
+上图展现了索引机制的写入流程:
+
+* 当用户向数据库系统写入新的数据后,数据库系统通知索引总调度器 `IndexManger`。
+* 总调度器首先向注册表进行校验,判断数据所在的序列是否创建过索引。
+* 当校验成功后,总调度器将数据传递到相应的索引实例调度器,图中省略这一步骤。
+* 调度器首先使用特征提取器将新数据转换为特征,然后由索引实例消费掉。
+* 当数据写入索引实例后,更新可用区间管理器。至此,完成整个写入流程。
+
+在当前的实现中,仅当触发IoTDB的刷写操作时(`MemTableFlushTask`),索引机制才触发索引的批量写入操作。这样带来两个好处:首先,在批量写入的情况下,
+传感器将多个数据点积攒成序列,方便了特征提取器的处理;其次,考虑到工业物联网数据的乱序情况,IoTDB会将一段时间内的数据进行排序再传给索引机制,这样减少了乱序数据给索引带来的负面影响。
+
+延迟写入索引并不会影响索引查询的正确性。对于已经写入IoTDB但尚未触发刷写操作的数据,索引机制中的可用区间管理器会将其标记为"索引不可用",因此,这部分数据会直接进入精化阶段进行计算,不会被遗漏。
+
+### 索引查询流程
+
+![image](https://user-images.githubusercontent.com/5548915/109540634-b95b1200-7afd-11eb-800a-71b3b11ce16d.png)
+
+当用户发起索引查询时,索引机制为其创建单独的线程并执行查询。相似性索引的一般查询流程如下:
+
+1. 用户发起索引查询时,IoTDB会为其创建一个单独的线程并执行整个查询流程。这一执行线程调用索引机制并传入查询条件。索引机制首先调用注册表,判断该序列上是否创建过索引、索引是否支持这一查询条件。
+2. 校验成功后,`IndexManager` 根据查询条件中指定的查询目标,将执行线程和查询条件传递到相应的索引实例调度器`IndexProcessor` 。
+3. 执行线程调用特征提取器 `FeatureExtractor`,对查询序列提取特征。然后将查询特征及其他查询条件输入索引实例 `IoTDBIndex` 。
+   1. 执行过滤阶段,并返回候选集,候选集中包含了一系列指向原始数据的指针。至此,查询处理模块的**过滤单元**完成。
+   2. 在进入精化阶段前,执行线程首先调用可用区间管理器,将索引不可用区间中的序列加入候选集中,避免乱序数据造成索引错误剪枝。
+   3. 然后调用查询优化器,对候选集的访问顺序进行重排序。
+   4. 接下来,执行线程访问IoTDB,读取候选集所指向的原始数据并完成精确计算。
+   5. 至此,查询处理模块的精化单元完成,返回结果。
+
+上述流程的第三步中,索引框架接管了精化阶段。但如果索引实例有更多的优化,也可以完全接管查询过程并直接返回结果。反映到模块调用上,即 `IndexProcessor` 直接调用 `IoTDBIndex#query` 函数,返回 `QueryDataSet` 。
+
+### 索引更新与删除
+
+索引更新主要涉及两个方面。首先,当IoTDB接收乱序数据后,索引可用区间管理器会相应更新,从而保证后续查询的正确性。其次,当IoTDB完成乱序数据整理和合并后,索引框架将对索引发起重建,这一步在未来实现。
+
+当索引被删除后,索引目录下的文件将被删除。
+
+
+
+## 文件结构
+
+![image](https://user-images.githubusercontent.com/5548915/109540601-a9433280-7afd-11eb-8bdb-3f77181c1637.png)
+
+上图左图是索引框架的文件组织结构。
+
+* 总文件夹index下包括数据文件夹和元数据文件夹;
+  * 元数据文件夹中目前仅包括注册表文件夹;
+  * 数据文件夹下,每个索引序列(对应一个 `IndexProcessor` )都对应一个文件夹;
+    * 索引序列文件夹会创建一个状态文件和可用区间管理器文件;
+    * 每个索引序列文件夹下可能创建了多个索引实例;
+
+上图右图是一个例子。
+
+
+
+## 并行性分析(TODO)
+
+## 复杂度分析(TODO)
+
+量化,时间复杂度,内存/磁盘空间复杂度。
+
+
+
+## 问题讨论
+
+### 索引读写并发能力-锁粒度
+
+为每一个索引实例分配一个读写锁,并由IndexProcessor控制索引的读写。
+
+由于原始数据是分批刷写磁盘的,因此索引也需要支持批量更新。这意味着索引不能在创建索引之前就看到全部原始数据。一些需要基于全局数据确定参数的索引的性能可能会受到影响(如VaFile和基于Kmeans的倒排索引)。
+
+目前为每个索引维护了读写锁(`org.apache.iotdb.db.index.IndexProcessor#indexLockMap`)。允许对索引的并发查询,但索引的写入和查询是互斥的。这一设计有两点值得讨论:
+
+1. 查询并发:并发查询提升了查询效率,但要求索引是查询并发安全的;
+2. 读写互斥:这意味着IoTDB刷写操作和索引查询之间会互相阻塞,无疑降低了效率。然而要取消这一限制,则要求索引是读写并发安全的。
+
+根据开发者对一些开源索引代码的调研,索引通常可以满足查询并发,但较难满足读写互斥,因此设计了读写锁机制。但是,值得讨论的是,是否对集成到IoTDB的索引代码提出更高的要求,从而提升IoTDB的效率呢?
+
+### 索引更新删除能力-可用区间
+
+TODO
+
+
+
+# 未来规划
+
+预计在第三次PR中提交的模块:
+
+* 特征提取器的实现 `IndexFeatureExtractor` :包括全序列检索和子序列检索中一些常见的特征提取方法,例如全序列检索用到的PAA特征、子序列检索中用到的两种滑动窗口模型等等;
+* 索引算法:基于以上特征提取器,实现了两种基本的检索算法:基于PAA特征的R树索引和等长特征块索引(ELB,回答子序列检索)
+
+留待未来实现的模块如下:
+
+* 统计模块IndexStats:几乎所有相似性索引(甚至可以推广到所有索引)都会关注一些共通的统计量,例如:CPU时间、磁盘访问时间、内存占用、磁盘占用、查询剪枝率、近似查询中的查准率和查全率等等。统计模块为其提供全面而标准的工具函数,用于索引性能评估和不同索引之间的性能比较;
+* 候选集顺序优化器 `IIndexCandidateOrderOptimize`:相似性索引剪枝后得到候选集列表。由于索引并未考虑IoTDB的数据组织方式,因此候选集的顺序可能不是最优的。候选集顺序优化器对此进行优化;
+* 其他索引技术
+
+
+
+# 测试验证(TODO)
+
+## 测试目标
+
+正确性测试或性能(对比)测试
+
+##  测试方案
+
+\- 测试系统
+
+\- 被测系统
+
+\- 测试环境
+
+\- 负载描述
+
+## 测试结果
+
+## 测试结论
\ No newline at end of file
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index eb527ae..476d547 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -657,6 +657,12 @@ default_index_window_range=10
 # buffer parameter for index processor.
 index_buffer_size=134217728
 
+# the max returned size of index query result
+max_index_query_result_size=5
+
+# the default value of max size of the index unusable segments, used in SubMatchIndexUsability
+default_max_size_of_unusable_segments=20
+
 # whether enable data partition. If disabled, all data belongs to partition 0
 enable_partition=false
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 030fde2..4e44ef8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
 import org.apache.iotdb.db.exception.LoadConfigurationException;
+import org.apache.iotdb.db.index.IndexProcessor;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.service.TSServiceImpl;
 import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -247,11 +248,11 @@ public class IoTDBConfig {
   /**
    * If we enable the memory-control mechanism during index building , {@code indexBufferSize}
    * refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
-   * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
-   * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
-   * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
-   * will be flushed to the disk file. As a result, data in one series may be divided into more than
-   * one part and indexed separately.
+   * one {@linkplain IndexProcessor IndexProcessor} share a total common buffer size. With the
+   * memory-control mechanism, the occupied memory of all raw data and index structures will be
+   * counted. If the memory buffer size reaches this threshold, the indexes will be flushed to the
+   * disk file. As a result, data in one series may be divided into more than one part and indexed
+   * separately.
    */
   private long indexBufferSize = 128 * 1024 * 1024L;
 
@@ -261,6 +262,10 @@ public class IoTDBConfig {
    */
   private int defaultIndexWindowRange = 10;
 
+  /**
+   * the default value of max size of the index unusable segments, used in SubMatchIndexUsability
+   */
+  private int defaultMaxSizeOfUnusableSegments = 20;
   /** index directory. */
   private String indexRootFolder = "data" + File.separator + "index";
 
@@ -637,6 +642,8 @@ public class IoTDBConfig {
   /** the number of virtual storage groups per user-defined storage group */
   private int virtualStorageGroupNum = 1;
 
+  private int maxIndexQueryResultSize = 5;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -2035,6 +2042,22 @@ public class IoTDBConfig {
     this.defaultIndexWindowRange = defaultIndexWindowRange;
   }
 
+  public int getMaxIndexQueryResultSize() {
+    return this.maxIndexQueryResultSize;
+  }
+
+  public void setMaxIndexQueryResultSize(int maxIndexQueryResultSize) {
+    this.maxIndexQueryResultSize = maxIndexQueryResultSize;
+  }
+
+  public int getDefaultMaxSizeOfUnusableSegments() {
+    return defaultMaxSizeOfUnusableSegments;
+  }
+
+  public void setDefaultMaxSizeOfUnusableSegments(int defaultSizeOfUsableSegments) {
+    this.defaultMaxSizeOfUnusableSegments = defaultSizeOfUsableSegments;
+  }
+
   public int getVirtualStorageGroupNum() {
     return virtualStorageGroupNum;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ea507f9..f1a86b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -436,6 +436,18 @@ public class IoTDBDescriptor {
         conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
       }
 
+      conf.setMaxIndexQueryResultSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "max_index_query_result_size",
+                  Integer.toString(conf.getMaxIndexQueryResultSize()))));
+
+      conf.setDefaultMaxSizeOfUnusableSegments(
+          Integer.parseInt(
+              properties.getProperty(
+                  "default_max_size_of_unusable_segments",
+                  Integer.toString(conf.getDefaultMaxSizeOfUnusableSegments()))));
+
       conf.setmManagerCacheSize(
           Integer.parseInt(
               properties
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..21b0e3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.IndexMemTableFlushTask;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -58,7 +61,9 @@ public class MemTableFlushTask {
           ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
           : new LinkedBlockingQueue<>();
 
+  private final boolean enabledIndex;
   private String storageGroup;
+  private final boolean sequence;
 
   private IMemTable memTable;
 
@@ -71,7 +76,7 @@ public class MemTableFlushTask {
    * @param storageGroup current storage group
    */
   public MemTableFlushTask(
-      IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
+      IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup, boolean sequence) {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
@@ -81,6 +86,8 @@ public class MemTableFlushTask {
         "flush task of Storage group {} memtable is created, flushing to file {}.",
         storageGroup,
         writer.getFile().getName());
+    this.enabledIndex = IoTDBDescriptor.getInstance().getConfig().isEnableIndex();
+    this.sequence = sequence;
   }
 
   /** the function for flushing memtable. */
@@ -100,6 +107,14 @@ public class MemTableFlushTask {
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
+    IndexMemTableFlushTask indexFlushTask = null;
+    if (enabledIndex) {
+      try {
+        indexFlushTask = IndexManager.getInstance().getIndexMemFlushTask(storageGroup, sequence);
+      } catch (Exception e) {
+        LOGGER.error("meet Exception in getIndexMemFlushTask, not affect the memtable flushing", e);
+      }
+    }
 
     // for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
@@ -114,6 +129,16 @@ public class MemTableFlushTask {
         TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.put(new Pair<>(tvList, desc));
+        if (enabledIndex && indexFlushTask != null) {
+          try {
+            String deviceId = memTableEntry.getKey();
+            String measurementId = iWritableMemChunkEntry.getKey();
+            indexFlushTask.buildIndexForOneSeries(new PartialPath(deviceId, measurementId), tvList);
+          } catch (Exception e) {
+            LOGGER.error(
+                "meet Exception in buildIndexForOneSeries, not affect the memtable flushing", e);
+          }
+        }
       }
 
       encodingTaskQueue.put(new EndChunkGroupIoTask());
@@ -133,6 +158,13 @@ public class MemTableFlushTask {
     }
 
     ioTaskFuture.get();
+    if (enabledIndex && indexFlushTask != null) {
+      try {
+        indexFlushTask.endFlush();
+      } catch (Exception e) {
+        LOGGER.error("meet Exception in endFlush, not affect the memtable flushing", e);
+      }
+    }
 
     try {
       writer.writePlanIndices();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..6a73393 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -775,7 +775,7 @@ public class TsFileProcessor {
       try {
         writer.mark();
         MemTableFlushTask flushTask =
-            new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
+            new MemTableFlushTask(memTableToFlush, writer, storageGroupName, sequence);
         flushTask.syncFlushMemTable();
       } catch (Exception e) {
         if (writer == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
index 7b8b8cf..89bc2fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
@@ -18,6 +18,7 @@
 package org.apache.iotdb.db.exception.index;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 public class QueryIndexException extends QueryProcessException {
 
@@ -26,4 +27,8 @@ public class QueryIndexException extends QueryProcessException {
   public QueryIndexException(String message, int errorCode) {
     super(message, errorCode);
   }
+
+  public QueryIndexException(String message) {
+    super(message, TSStatusCode.INDEX_QUERY_ERROR.getStatusCode());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
index 7b8b8cf..78ba38f 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
@@ -17,13 +17,13 @@
  */
 package org.apache.iotdb.db.exception.index;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
-public class QueryIndexException extends QueryProcessException {
+public class UnsupportedIndexFuncException extends QueryIndexException {
 
-  private static final long serialVersionUID = 9019233783504576296L;
+  private static final long serialVersionUID = 4185676677334759039L;
 
-  public QueryIndexException(String message, int errorCode) {
-    super(message, errorCode);
+  public UnsupportedIndexFuncException(String message) {
+    super(message, TSStatusCode.UNSUPPORTED_INDEX_FUNC_ERROR.getStatusCode());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java
new file mode 100644
index 0000000..1104c99
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A pool to control the max size of concurrent threads for index insertion at the same time. */
+public class IndexBuildTaskPoolManager extends AbstractPoolManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildTaskPoolManager.class);
+
+  private IndexBuildTaskPoolManager() {
+    int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentIndexBuildThread();
+    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.INDEX_SERVICE.getName());
+  }
+
+  public static IndexBuildTaskPoolManager getInstance() {
+    return IndexBuildTaskPoolManager.InstanceHolder.instance;
+  }
+
+  @Override
+  public Logger getLogger() {
+    return LOGGER;
+  }
+
+  @Override
+  public String getName() {
+    return "index building task";
+  }
+
+  @Override
+  public void start() {
+    if (pool == null) {
+      int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentIndexBuildThread();
+      pool =
+          IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.INDEX_SERVICE.getName());
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (pool != null) {
+      close();
+      pool = null;
+    }
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+      // allowed to do nothing
+    }
+
+    private static IndexBuildTaskPoolManager instance = new IndexBuildTaskPoolManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
new file mode 100644
index 0000000..053596b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_DATA_DIR_NAME;
+import static org.apache.iotdb.db.index.common.IndexConstant.META_DIR_NAME;
+import static org.apache.iotdb.db.index.common.IndexConstant.ROUTER_DIR;
+
+/**
+ * IndexManager is the global manager of index framework, which will be called by IoTDB when index
+ * creation, deletion, query and insertion. IndexManager will pass the index operations to the
+ * corresponding IndexProcessor.
+ */
+public class IndexManager implements IndexManagerMBean, IService {
+
+  private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+  /**
+   * The index root directory. All index metadata files, index data files are stored in this
+   * directory.
+   */
+  private final String indexRootDirPath;
+
+  private final String indexMetaDirPath;
+  private final String indexRouterDir;
+  private final String indexDataDirPath;
+  private final IIndexRouter router;
+
+  /** A function interface to construct an index processor. */
+  private CreateIndexProcessorFunc createIndexProcessorFunc;
+
+  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private IndexManager() {
+    indexRootDirPath = DirectoryManager.getInstance().getIndexRootFolder();
+    indexMetaDirPath = indexRootDirPath + File.separator + META_DIR_NAME;
+    indexRouterDir = indexMetaDirPath + File.separator + ROUTER_DIR;
+    indexDataDirPath = indexRootDirPath + File.separator + INDEX_DATA_DIR_NAME;
+    createIndexProcessorFunc =
+        (indexSeries, indexInfoMap) ->
+            new IndexProcessor(
+                indexSeries,
+                IndexUtils.removeIllegalStarInDir(indexDataDirPath + File.separator + indexSeries));
+    router = IIndexRouter.Factory.getIndexRouter(indexRouterDir);
+  }
+
+  /**
+   * Given an IndexSeries, return its feature file path (unused currently).
+   *
+   * @param path the path on which the index is created, e.g. Root.ery.*.Glu or Root.Wind.d1.Speed.
+   * @param indexType the type of index
+   * @return the feature directory path for this index.
+   */
+  private String getFeatureFileDirectory(PartialPath path, IndexType indexType) {
+    return IndexUtils.removeIllegalStarInDir(
+        indexDataDirPath + File.separator + path.getFullPath() + File.separator + indexType);
+  }
+
+  /**
+   * Given an IndexSeries, return its data file path (unused currently).
+   *
+   * @param path the path on which the index is created, e.g. Root.ery.*.Glu or Root.Wind.d1.Speed.
+   * @param indexType the type of index
+   * @return the feature directory path for this index.
+   */
+  private String getIndexDataDirectory(PartialPath path, IndexType indexType) {
+    return getFeatureFileDirectory(path, indexType);
+  }
+
+  /**
+   * Execute the index creation. Due to the complex mapping relationship between the time series and
+   * the index instances, we encapsulate the index metadata management into the router {@link
+   * IIndexRouter} for stability.
+   *
+   * @param indexSeriesList a singleton list up to now.
+   * @param indexInfo the index information.
+   */
+  public void createIndex(List<PartialPath> indexSeriesList, IndexInfo indexInfo)
+      throws MetadataException {
+    if (!indexSeriesList.isEmpty()) {
+      router.addIndexIntoRouter(indexSeriesList.get(0), indexInfo, createIndexProcessorFunc, true);
+    }
+  }
+
+  /**
+   * Execute the index deletion.
+   *
+   * @param indexSeriesList a singleton list up to now.
+   * @param indexType the index type to be dropped.
+   */
+  public void dropIndex(List<PartialPath> indexSeriesList, IndexType indexType)
+      throws MetadataException, IOException {
+    if (!indexSeriesList.isEmpty()) {
+      router.removeIndexFromRouter(indexSeriesList.get(0), indexType);
+    }
+  }
+
+  /**
+   * When the storage group flushes,we construct {@link IndexMemTableFlushTask} for index insertion。
+   *
+   * <p>So far, the index insertion is triggered only when Memtables flush. A storage group contains
+   * several series and each of these series may create several indexes. In other words, one storage
+   * group may correspond to several {@linkplain IndexProcessor}.
+   *
+   * <p>This method return a router to find all {@linkplain IndexProcessor} related to this storage
+   * group.
+   *
+   * @param storageGroupPath the path of the storage group
+   * @param sequence true if it's sequence data, otherwise it's unsequence data
+   * @return a router to find all {@linkplain IndexProcessor} related to this storage group, and
+   *     other informations
+   * @see IndexMemTableFlushTask
+   */
+  public IndexMemTableFlushTask getIndexMemFlushTask(String storageGroupPath, boolean sequence) {
+    // StorageGroupPath may contain file separator, we put a temp patch here.
+    storageGroupPath = storageGroupPath.replace(File.separatorChar, '/');
+    String realStorageGroupPath = storageGroupPath.split("/")[0];
+    IIndexRouter sgRouter = router.getRouterByStorageGroup(realStorageGroupPath);
+    return new IndexMemTableFlushTask(sgRouter, sequence);
+  }
+
+  /**
+   * Index query.
+   *
+   * <p>The initial idea is that index instances only process the "pruning phase" to prune some
+   * negative items and return a candidate list, the framework finishes the rest (so-called
+   * "post-processing phase" or "refinement phase", to query the raw time series by the candidate
+   * list and then to verified which series in candidate list are real positive results).
+   *
+   * <p>The above design is common enough for all of similarity index methods. However, index
+   * technology has various optimizations, and enforcing the above strategy will affect the freedom
+   * of index integration. The two implemented indexes (ELB index and RTree index) have their own
+   * optimizations which combine the pruning phase and post-processing phase. Therefore, in current
+   * version, the query process is entirely passed to the index instance.
+   *
+   * @param paths the series to be queried.
+   * @param indexType the index type to be queried.
+   * @param queryProps the properties of this query.
+   * @param context the query context.
+   * @param alignedByTime whether aligned index result by timestamps.
+   * @return the index query result.
+   */
+  public QueryDataSet queryIndex(
+      List<PartialPath> paths,
+      IndexType indexType,
+      Map<String, Object> queryProps,
+      QueryContext context,
+      boolean alignedByTime)
+      throws QueryIndexException, StorageEngineException {
+    if (paths.size() != 1) {
+      throw new QueryIndexException("Index allows to query only one path");
+    }
+    PartialPath queryIndexSeries = paths.get(0);
+    IndexProcessorStruct indexProcessorStruct =
+        router.startQueryAndCheck(queryIndexSeries, indexType, context);
+    List<StorageGroupProcessor> list = indexProcessorStruct.addMergeLock();
+    try {
+      return indexProcessorStruct.processor.query(indexType, queryProps, context, alignedByTime);
+    } finally {
+      StorageEngine.getInstance().mergeUnLock(list);
+      router.endQuery(indexProcessorStruct.processor.getIndexSeries(), indexType, context);
+    }
+  }
+
+  private void prepareDirectory() {
+    File rootDir = IndexUtils.getIndexFile(indexRootDirPath);
+    if (!rootDir.exists()) {
+      rootDir.mkdirs();
+    }
+    File routerDir = IndexUtils.getIndexFile(indexRouterDir);
+    if (!routerDir.exists()) {
+      routerDir.mkdirs();
+    }
+    File metaDir = IndexUtils.getIndexFile(indexMetaDirPath);
+    if (!metaDir.exists()) {
+      metaDir.mkdirs();
+    }
+    File dataDir = IndexUtils.getIndexFile(indexDataDirPath);
+    if (!dataDir.exists()) {
+      dataDir.mkdirs();
+    }
+  }
+
+  private void deleteDroppedIndexData() throws IOException, IllegalPathException {
+    for (File processorDataDir :
+        Objects.requireNonNull(IndexUtils.getIndexFile(indexDataDirPath).listFiles())) {
+      String processorName = processorDataDir.getName();
+      Map<IndexType, IndexInfo> infos =
+          router.getIndexInfosByIndexSeries(new PartialPath(processorName));
+      if (infos.isEmpty()) {
+        FileUtils.deleteDirectory(processorDataDir);
+      } else {
+        for (File indexDataDir : Objects.requireNonNull(processorDataDir.listFiles())) {
+          if (indexDataDir.isDirectory()
+              && !infos.containsKey(IndexType.valueOf(indexDataDir.getName()))) {
+            FileUtils.deleteDirectory(indexDataDir);
+          }
+        }
+      }
+    }
+  }
+
+  /** close the index manager. */
+  private synchronized void close() {
+    router.serialize(true);
+  }
+
+  @Override
+  public void start() throws StartupException {
+    if (!config.isEnableIndex()) {
+      return;
+    }
+    IndexBuildTaskPoolManager.getInstance().start();
+    try {
+      JMXService.registerMBean(this, ServiceType.INDEX_SERVICE.getJmxName());
+      prepareDirectory();
+      router.deserializeAndReload(createIndexProcessorFunc);
+      deleteDroppedIndexData();
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  /**
+   * As IoTDB has no normal shutdown mechanism, this function will not be called. To ensure the
+   * information safety, The router needs to serialize index metadata every time createIndex or
+   * dropIndex is called.
+   */
+  @Override
+  public void stop() {
+    if (!config.isEnableIndex()) {
+      return;
+    }
+    close();
+    IndexBuildTaskPoolManager.getInstance().stop();
+    JMXService.deregisterMBean(ServiceType.INDEX_SERVICE.getJmxName());
+  }
+
+  public static IndexManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.INDEX_SERVICE;
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static IndexManager instance = new IndexManager();
+  }
+
+  @TestOnly
+  public synchronized void deleteAll() throws IOException {
+    logger.info("Start deleting all storage groups' timeseries");
+    close();
+
+    File indexMetaDir = IndexUtils.getIndexFile(this.indexMetaDirPath);
+    if (indexMetaDir.exists()) {
+      FileUtils.deleteDirectory(indexMetaDir);
+    }
+
+    File indexDataDir = IndexUtils.getIndexFile(this.indexDataDirPath);
+    if (indexDataDir.exists()) {
+      FileUtils.deleteDirectory(indexDataDir);
+    }
+    File indexRootDir =
+        IndexUtils.getIndexFile(DirectoryManager.getInstance().getIndexRootFolder());
+    if (indexRootDir.exists()) {
+      FileUtils.deleteDirectory(indexRootDir);
+    }
+  }
+
+  @TestOnly
+  public IIndexRouter getRouter() {
+    return router;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java
new file mode 100644
index 0000000..95cb0a9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+public interface IndexManagerMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java
new file mode 100644
index 0000000..dea0b1a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+/**
+ * IndexMemTableFlushTask is responsible for the index insertion when a TsFileProcessor flushes.
+ * IoTDB creates a MemTableFlushTask when a memtable is flushed to disk. After MemTableFlushTask
+ * sorts a series, it will be passed to a IndexMemTableFlushTask. The IndexMemTableFlushTask will
+ * detect whether one or more indexes have been created on this series, and pass its data to
+ * corresponding IndexProcessors and insert it into the corresponding indexes.
+ *
+ * <p>IndexMemTableFlushTask may cover more than one index processor.
+ */
+public class IndexMemTableFlushTask {
+
+  private final IIndexRouter router;
+  private final boolean sequence;
+
+  /** it should be immutable. */
+  IndexMemTableFlushTask(IIndexRouter router, boolean sequence) {
+    // check all processors
+    this.router = router;
+    this.sequence = sequence;
+    // in current version, we don't build index for unsequence block
+    if (sequence) {
+      for (IndexProcessorStruct p : router.getAllIndexProcessorsAndInfo()) {
+        p.processor.startFlushMemTable();
+      }
+    }
+  }
+
+  /**
+   * insert sorted time series
+   *
+   * @param path the path of time series
+   * @param tvList the sorted data
+   */
+  public void buildIndexForOneSeries(PartialPath path, TVList tvList) {
+    // in current version, we don't build index for unsequence block, but only update the index
+    // usability range.
+    if (sequence) {
+      router.getIndexProcessorByPath(path).forEach(p -> p.buildIndexForOneSeries(path, tvList));
+    } else {
+      router.getIndexProcessorByPath(path).forEach(p -> p.updateUnsequenceData(path, tvList));
+    }
+  }
+
+  /** wait for all IndexProcessors to finish building indexes. */
+  public void endFlush() {
+    if (sequence) {
+      router.getAllIndexProcessorsAndInfo().forEach(p -> p.processor.endFlushMemTable());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
new file mode 100644
index 0000000..95cf721
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
+import org.apache.iotdb.db.exception.index.IndexRuntimeException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.algorithm.IoTDBIndex;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.common.func.IndexNaiveFunc;
+import org.apache.iotdb.db.index.feature.IndexFeatureExtractor;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Each {@code IndexProcessor} manages all index instances under an <b>IndexSeries</b>.
+ *
+ * <p>An <b>IndexSeries</b> is one series or a set of series that an index instance acts on.
+ *
+ * <ul>
+ *   <li>For whole matching, the IndexSeries is a set of series represented by a path with wildcard
+ *       characters. For example, creating index on IndexSeries "root.steel.*.temperature" means all
+ *       series belong to this IndexSeries will be inserted, such as root.steel.s1.temperature,
+ *       root. steel.s2.temperature, ...
+ *   <li>For subsequence matching, the IndexSeries is a full path. For example, creating index on
+ *       IndexSeries "root.wind.azq01.speed" means all subsequence of this series will be inserted.
+ * </ul>
+ *
+ * In current design。users are allowed to create more than one type of index on an IndexSeries. e.g.
+ * we can create RTree and DS-Tree on "root.steel.*.temperature". New coming data will be inserted
+ * into both of these two indexes.
+ */
+public class IndexProcessor implements Comparable<IndexProcessor> {
+
+  private static final Logger logger = LoggerFactory.getLogger(IndexProcessor.class);
+
+  private PartialPath indexSeries;
+  private final String indexSeriesDirPath;
+  private TSDataType tsDataType;
+  private final IndexBuildTaskPoolManager indexBuildPoolManager;
+  private ReadWriteLock lock = new ReentrantReadWriteLock();
+  private Map<IndexType, ReadWriteLock> indexLockMap;
+
+  /**
+   * How many indexes of this IndexProcessor are currently inserting data. If 0, no indexes are
+   * inserting, that is, this IndexProcessor is not in flushing state.
+   */
+  private AtomicInteger numIndexBuildTasks;
+
+  /** Whether the processor has been closed */
+  private volatile boolean closed;
+
+  /** The map of index instances. */
+  private Map<IndexType, IoTDBIndex> allPathsIndexMap;
+
+  /** Some status data saved when index is closed for next open. */
+  private final Map<IndexType, ByteBuffer> previousDataBufferMap;
+
+  private final String previousDataBufferFile;
+
+  /**
+   * Each index instance corresponds to an {@linkplain IIndexUsable}. All IIndexUsable of a
+   * IndexProcessor are stored in one file.
+   */
+  private Map<IndexType, IIndexUsable> usableMap;
+
+  private final String usableFile;
+
+  /** The optimizer for the post-processing phase (refinement phase). Unused yet. */
+  private final IIndexCandidateOrderOptimize refinePhaseOptimizer;
+
+  public IndexProcessor(PartialPath indexSeries, String indexSeriesDirPath) {
+    this.indexBuildPoolManager = IndexBuildTaskPoolManager.getInstance();
+
+    this.numIndexBuildTasks = new AtomicInteger(0);
+    this.indexSeries = indexSeries;
+    this.indexSeriesDirPath = indexSeriesDirPath;
+    File dir = IndexUtils.getIndexFile(indexSeriesDirPath);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+    this.closed = false;
+    this.allPathsIndexMap = new EnumMap<>(IndexType.class);
+    this.previousDataBufferMap = new EnumMap<>(IndexType.class);
+    this.indexLockMap = new EnumMap<>(IndexType.class);
+    this.usableMap = new EnumMap<>(IndexType.class);
+    this.previousDataBufferFile = indexSeriesDirPath + File.separator + "previousBuffer";
+    this.usableFile = indexSeriesDirPath + File.separator + "usableMap";
+    this.tsDataType = initSeriesType();
+    this.refinePhaseOptimizer = IIndexCandidateOrderOptimize.Factory.getOptimize();
+    deserializePreviousBuffer();
+    deserializeUsable(indexSeries);
+  }
+
+  /**
+   * Determines the data type of the index instances. All time series covered by this IndexProcessor
+   * should have the same data type.
+   *
+   * @return tsDataType of this IndexProcessor
+   */
+  private TSDataType initSeriesType() {
+    try {
+      if (indexSeries.isFullPath()) {
+        return MManager.getInstance().getSeriesType(indexSeries);
+      } else {
+        List<PartialPath> list =
+            IoTDB.metaManager.getAllTimeseriesPathWithAlias(indexSeries, 1, 0).left;
+        if (list.isEmpty()) {
+          throw new IndexRuntimeException("No series in the wildcard path");
+        } else {
+          return MManager.getInstance().getSeriesType(list.get(0));
+        }
+      }
+    } catch (MetadataException e) {
+      throw new IndexRuntimeException("get type failed. ", e);
+    }
+  }
+
+  private String getIndexDir(IndexType indexType) {
+    return indexSeriesDirPath + File.separator + indexType;
+  }
+
+  private void serializeUsable() {
+    File file = SystemFileFactory.INSTANCE.getFile(usableFile);
+    try (OutputStream outputStream = new FileOutputStream(file)) {
+      ReadWriteIOUtils.write(usableMap.size(), outputStream);
+      for (Entry<IndexType, IIndexUsable> entry : usableMap.entrySet()) {
+        IndexType indexType = entry.getKey();
+        ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+        IIndexUsable v = entry.getValue();
+        v.serialize(outputStream);
+      }
+    } catch (IOException e) {
+      logger.error("Error when serialize usability. Given up.", e);
+    }
+  }
+
+  private void serializePreviousBuffer() {
+    File file = SystemFileFactory.INSTANCE.getFile(previousDataBufferFile);
+    try (OutputStream outputStream = new FileOutputStream(file)) {
+      ReadWriteIOUtils.write(previousDataBufferMap.size(), outputStream);
+      for (Entry<IndexType, ByteBuffer> entry : previousDataBufferMap.entrySet()) {
+        IndexType indexType = entry.getKey();
+        ByteBuffer buffer = entry.getValue();
+        ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+        ReadWriteIOUtils.write(buffer, outputStream);
+      }
+    } catch (IOException e) {
+      logger.error("Error when serialize previous buffer. Given up.", e);
+    }
+  }
+
+  private void deserializePreviousBuffer() {
+    File file = SystemFileFactory.INSTANCE.getFile(previousDataBufferFile);
+    if (!file.exists()) {
+      return;
+    }
+    try (InputStream inputStream = new FileInputStream(file)) {
+      int size = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < size; i++) {
+        IndexType indexType = IndexType.deserialize(ReadWriteIOUtils.readShort(inputStream));
+        ByteBuffer byteBuffer =
+            ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(inputStream);
+        previousDataBufferMap.put(indexType, byteBuffer);
+      }
+    } catch (IOException e) {
+      logger.error("Error when deserialize previous buffer. Given up.", e);
+    }
+  }
+
+  private void deserializeUsable(PartialPath indexSeries) {
+    File file = SystemFileFactory.INSTANCE.getFile(usableFile);
+    if (!file.exists()) {
+      return;
+    }
+    try (InputStream inputStream = new FileInputStream(file)) {
+      int size = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < size; i++) {
+        short indexTypeShort = ReadWriteIOUtils.readShort(inputStream);
+        IndexType indexType = IndexType.deserialize(indexTypeShort);
+        IIndexUsable usable =
+            IIndexUsable.Factory.deserializeIndexUsability(indexSeries, inputStream);
+        usableMap.put(indexType, usable);
+      }
+    } catch (IOException | IllegalPathException e) {
+      logger.error("Error when deserialize usability. Given up.", e);
+    }
+  }
+
+  /** Flush all index instances to disk except NoIndex. */
+  @SuppressWarnings("squid:S2589")
+  public synchronized void close(boolean deleteFiles) throws IOException {
+    if (closed) {
+      return;
+    }
+    waitingFlushEndAndDo(
+        () -> {
+          lock.writeLock().lock();
+          try {
+            // store Preprocessor
+            for (Entry<IndexType, IoTDBIndex> entry : allPathsIndexMap.entrySet()) {
+              IndexType indexType = entry.getKey();
+              if (indexType == IndexType.NO_INDEX) {
+                continue;
+              }
+              IoTDBIndex index = entry.getValue();
+              previousDataBufferMap.put(entry.getKey(), index.closeAndRelease());
+            }
+            logger.info("close and release index processor: {}", indexSeries);
+            allPathsIndexMap.clear();
+            serializeUsable();
+            serializePreviousBuffer();
+            closed = true;
+            if (deleteFiles) {
+              File dir = IndexUtils.getIndexFile(indexSeriesDirPath);
+              dir.delete();
+            }
+          } finally {
+            lock.writeLock().unlock();
+          }
+        });
+  }
+
+  private void waitingFlushEndAndDo(IndexNaiveFunc indexNaiveAction) throws IOException {
+    // wait the flushing end.
+    long waitingTime;
+    long waitingInterval = 100;
+    long st = System.currentTimeMillis();
+    while (true) {
+      if (isFlushing()) {
+        try {
+          Thread.sleep(waitingInterval);
+        } catch (InterruptedException e) {
+          logger.error("interrupted, index insert may not complete.", e);
+          return;
+        }
+        waitingTime = System.currentTimeMillis() - st;
+        // wait for too long time.
+        if (waitingTime > 3000) {
+          waitingInterval = 1000;
+          if (logger.isWarnEnabled()) {
+            logger.warn(
+                String.format(
+                    "IndexFileProcessor %s: wait-close time %d ms is too long.",
+                    indexSeries, waitingTime));
+          }
+        }
+      } else {
+        indexNaiveAction.act();
+        break;
+      }
+    }
+  }
+
+  public PartialPath getIndexSeries() {
+    return indexSeries;
+  }
+
+  @Override
+  public int hashCode() {
+    return indexSeries.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+
+    return compareTo((IndexProcessor) obj) == 0;
+  }
+
+  @Override
+  public String toString() {
+    return indexSeries + ": " + allPathsIndexMap;
+  }
+
+  @Override
+  public int compareTo(IndexProcessor o) {
+    return indexSeries.compareTo(o.indexSeries);
+  }
+
+  private boolean isFlushing() {
+    return numIndexBuildTasks.get() > 0;
+  }
+
+  void startFlushMemTable() {
+    lock.writeLock().lock();
+    try {
+      if (closed) {
+        throw new IndexRuntimeException("closed index file !!!!!");
+      }
+      if (isFlushing()) {
+        throw new IndexRuntimeException("There has been a flushing, do you want to wait?");
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Insert sorted series into index instances.
+   *
+   * @param path the path of time series
+   * @param tvList the sorted series
+   */
+  void buildIndexForOneSeries(PartialPath path, TVList tvList) {
+    // for every index of this path, submit a task to pool.
+    lock.writeLock().lock();
+    numIndexBuildTasks.incrementAndGet();
+    try {
+      if (tvList.getDataType() != tsDataType) {
+        logger.warn(
+            "TsDataType unmatched, ignore: indexSeries {}: {}, given series {}: {}",
+            indexSeries,
+            tsDataType,
+            path,
+            tvList.getDataType());
+      }
+      allPathsIndexMap.forEach(
+          (indexType, index) -> {
+            // NO_INDEX doesn't involve the phase of building index
+            if (indexType == IndexType.NO_INDEX) {
+              numIndexBuildTasks.decrementAndGet();
+              return;
+            }
+            Runnable buildTask =
+                () -> {
+                  try {
+                    indexLockMap.get(indexType).writeLock().lock();
+                    IndexFeatureExtractor extractor = index.startFlushTask(path, tvList);
+                    while (extractor.hasNext()) {
+                      extractor.processNext();
+                      index.buildNext();
+                    }
+                    index.endFlushTask();
+                  } catch (IndexManagerException e) {
+                    // Give up the following data, but the previously serialized chunk will not be
+                    // affected.
+                    logger.error("build index failed", e);
+                  } catch (RuntimeException e) {
+                    logger.error("RuntimeException", e);
+                  } finally {
+                    numIndexBuildTasks.decrementAndGet();
+                    indexLockMap.get(indexType).writeLock().unlock();
+                  }
+                };
+            indexBuildPoolManager.submit(buildTask);
+          });
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /** Not return until all index insertion have finished. */
+  void endFlushMemTable() {
+    // wait until all flushing tasks end.
+    try {
+      waitingFlushEndAndDo(() -> {});
+    } catch (IOException ignored) {
+      // the exception is ignored
+    }
+  }
+
+  /**
+   * According to the passed-in {@code indexInfoMap}, refresh the index instances in the current
+   * IndexProcessor. When someone index is created or dropped, the IndexProcessor is out of date, so
+   * it needs to be refreshed.
+   *
+   * @param indexInfoMap passed from {@link IIndexRouter}
+   */
+  public void refreshSeriesIndexMapFromMManager(Map<IndexType, IndexInfo> indexInfoMap) {
+    lock.writeLock().lock();
+    try {
+      // Add indexes that are not in the previous map
+      for (Entry<IndexType, IndexInfo> entry : indexInfoMap.entrySet()) {
+        IndexType indexType = entry.getKey();
+        IndexInfo indexInfo = entry.getValue();
+        if (!allPathsIndexMap.containsKey(indexType)) {
+          IoTDBIndex index =
+              IndexType.constructIndex(
+                  indexSeries,
+                  tsDataType,
+                  getIndexDir(indexType),
+                  indexType,
+                  indexInfo,
+                  previousDataBufferMap.get(indexType));
+          allPathsIndexMap.putIfAbsent(indexType, index);
+          indexLockMap.putIfAbsent(indexType, new ReentrantReadWriteLock());
+          usableMap.putIfAbsent(
+              indexType, IIndexUsable.Factory.createEmptyIndexUsability(indexSeries));
+        }
+      }
+
+      // remove indexes that are removed from the previous map
+      for (IndexType indexType : new ArrayList<>(allPathsIndexMap.keySet())) {
+        if (!indexInfoMap.containsKey(indexType)) {
+          try {
+            allPathsIndexMap.get(indexType).closeAndRelease();
+          } catch (IOException e) {
+            logger.warn(
+                "Meet error when close {} before removing index, {}", indexType, e.getMessage());
+          }
+          // remove index file directories
+          File dir = IndexUtils.getIndexFile(getIndexDir(indexType));
+          dir.delete();
+          allPathsIndexMap.remove(indexType);
+          usableMap.remove(indexType);
+        }
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /** For unsequence data, mark them as "index-unusable" in corresponding IIndexUsable */
+  void updateUnsequenceData(PartialPath path, TVList tvList) {
+    this.usableMap.forEach(
+        (indexType, usable) ->
+            usable.minusUsableRange(path, tvList.getMinTime(), tvList.getLastTime()));
+  }
+
+  /**
+   * index query.
+   *
+   * @param indexType the type of index to be queried
+   * @param queryProps the query conditions
+   * @param context the query context
+   * @param alignedByTime true if result series need to be aligned by the timestamp.
+   * @return index query result
+   */
+  public QueryDataSet query(
+      IndexType indexType,
+      Map<String, Object> queryProps,
+      QueryContext context,
+      boolean alignedByTime)
+      throws QueryIndexException {
+    try {
+      lock.readLock().lock();
+      try {
+        if (!indexLockMap.containsKey(indexType)) {
+          throw new QueryIndexException(
+              String.format(
+                  "%s hasn't been built on %s", indexType.toString(), indexSeries.getFullPath()));
+        } else {
+          indexLockMap.get(indexType).readLock().lock();
+        }
+      } finally {
+        lock.readLock().unlock();
+      }
+      IoTDBIndex index = allPathsIndexMap.get(indexType);
+      return index.query(
+          queryProps, this.usableMap.get(indexType), context, refinePhaseOptimizer, alignedByTime);
+
+    } finally {
+      indexLockMap.get(indexType).readLock().unlock();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java b/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java
new file mode 100644
index 0000000..984e4ac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.algorithm;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.index.common.DistSeries;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.feature.IndexFeatureExtractor;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * For index developers, the indexing framework aims to provide a simple and friendly platform and
+ * shield the complex details in other modules.
+ *
+ * <p>To add a new index methods, developers need inherit {@linkplain IoTDBIndex} or its subclass.
+ */
+public abstract class IoTDBIndex {
+
+  protected final PartialPath indexSeries;
+  protected final IndexType indexType;
+  protected final TSDataType tsDataType;
+
+  protected final Map<String, String> props;
+  protected IndexFeatureExtractor indexFeatureExtractor;
+
+  public IoTDBIndex(PartialPath indexSeries, TSDataType tsDataType, IndexInfo indexInfo) {
+    this.indexSeries = indexSeries;
+    this.indexType = indexInfo.getIndexType();
+    this.props = indexInfo.getProps();
+    this.tsDataType = tsDataType;
+  }
+
+  /**
+   * An index should determine which FeatureExtractor it uses and hook it to {@linkplain *
+   * IoTDBIndex}.indexFeatureExtractor. This method is called when IoTDBIndex is created.
+   *
+   * @param previous the status data saved in the last closing of the FeatureExtractor
+   * @param inQueryMode true if it's during index query, false if it's during index building
+   */
+  public abstract void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode);
+
+  /** A new item has been pre-processed by the FeatureExtractor, now the index can insert it. */
+  public abstract boolean buildNext() throws IndexManagerException;
+
+  /** This index will be closed, it's time to serialize in-memory data to disk for next open. */
+  protected abstract void flushIndex();
+
+  /**
+   * execute index query and return the result.
+   *
+   * @param queryProps query conditions
+   * @param iIndexUsable the information of index usability
+   * @param context query context provided by IoTDB.
+   * @param candidateOrderOptimize an optimizer for the order of visiting candidates
+   * @param alignedByTime true if the result series need to aligned by timestamp, otherwise they
+   *     will be aligned by their first points
+   * @return the result should be consistent with other IoTDB query result.
+   */
+  public abstract QueryDataSet query(
+      Map<String, Object> queryProps,
+      IIndexUsable iIndexUsable,
+      QueryContext context,
+      IIndexCandidateOrderOptimize candidateOrderOptimize,
+      boolean alignedByTime)
+      throws QueryIndexException;
+
+  /**
+   * In current design, the index building (insert data) only occurs in the memtable flush. When
+   * this method is called, a batch of raw data is coming.
+   *
+   * @param tvList tvList to insert
+   * @return FeatureExtractor filled with the given raw data
+   */
+  public IndexFeatureExtractor startFlushTask(PartialPath partialPath, TVList tvList) {
+    this.indexFeatureExtractor.appendNewSrcData(tvList);
+    return indexFeatureExtractor;
+  }
+
+  /** The flush task has ended. */
+  public void endFlushTask() {
+    indexFeatureExtractor.clearProcessedSrcData();
+  }
+
+  /** Close the index, release resources of the index structure and the feature extractor. */
+  public ByteBuffer closeAndRelease() throws IOException {
+    flushIndex();
+    if (indexFeatureExtractor != null) {
+      return indexFeatureExtractor.closeAndRelease();
+    } else {
+      return ByteBuffer.allocate(0);
+    }
+  }
+
+  public TSDataType getTsDataType() {
+    return tsDataType;
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
+  }
+
+  @Override
+  public String toString() {
+    return indexType.toString();
+  }
+
+  protected QueryDataSet constructSearchDataset(List<DistSeries> res, boolean alignedByTime)
+      throws QueryIndexException {
+    return constructSearchDataset(
+        res, alignedByTime, IoTDBDescriptor.getInstance().getConfig().getMaxIndexQueryResultSize());
+  }
+
+  protected QueryDataSet constructSearchDataset(
+      List<DistSeries> res, boolean alignedByTime, int nMaxReturnSeries)
+      throws QueryIndexException {
+    if (alignedByTime) {
+      throw new QueryIndexException("Unsupported alignedByTime result");
+    }
+    // make result paths and types
+    nMaxReturnSeries = Math.min(nMaxReturnSeries, res.size());
+    List<PartialPath> paths = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    Map<String, Integer> pathToIndex = new HashMap<>();
+    int nMinLength = Integer.MAX_VALUE;
+    for (int i = 0; i < nMaxReturnSeries; i++) {
+      PartialPath series = res.get(i).partialPath;
+      paths.add(series);
+      pathToIndex.put(series.getFullPath(), i);
+      types.add(tsDataType);
+      if (res.get(i).tvList.size() < nMinLength) {
+        nMinLength = res.get(i).tvList.size();
+      }
+    }
+    IndexQueryDataSet dataSet = new IndexQueryDataSet(paths, types, pathToIndex);
+    if (nMaxReturnSeries == 0) {
+      return dataSet;
+    }
+    for (int row = 0; row < nMinLength; row++) {
+      RowRecord record = new RowRecord(row);
+      for (int col = 0; col < nMaxReturnSeries; col++) {
+        TVList tvList = res.get(col).tvList;
+        record.addField(IndexUtils.getValue(tvList, row), tsDataType);
+      }
+      dataSet.putRecord(record);
+    }
+    return dataSet;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java b/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java
new file mode 100644
index 0000000..1e9ff88
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.algorithm;
+
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * NoIndex do nothing on feature extracting and data pruning. Its index-available range is always
+ * empty.
+ */
+public class NoIndex extends IoTDBIndex {
+
+  public NoIndex(PartialPath path, TSDataType tsDataType, IndexInfo indexInfo) {
+    super(path, tsDataType, indexInfo);
+  }
+
+  @Override
+  public void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode) {
+    // NoIndex does nothing
+  }
+
+  @Override
+  public boolean buildNext() {
+    return true;
+  }
+
+  @Override
+  protected void flushIndex() {
+    // NoIndex does nothing
+  }
+
+  @Override
+  public QueryDataSet query(
+      Map<String, Object> queryProps,
+      IIndexUsable iIndexUsable,
+      QueryContext context,
+      IIndexCandidateOrderOptimize candidateOrderOptimize,
+      boolean alignedByTime) {
+    return new IndexQueryDataSet(
+        Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java b/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java
new file mode 100644
index 0000000..aa50be1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+public class DistSeries {
+
+  public double dist;
+  public TVList tvList;
+  public PartialPath partialPath;
+
+  public DistSeries(double dist, TVList tvList, PartialPath partialPath) {
+    this.dist = dist;
+    this.tvList = tvList;
+    this.partialPath = partialPath;
+  }
+
+  public String toString() {
+    return "(" + partialPath + "," + dist + ":" + tvList + ")";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
index d971aa7..3c0cd32 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
@@ -19,18 +19,72 @@ package org.apache.iotdb.db.index.common;
 
 public class IndexConstant {
 
+  // SQL show
+  public static final String ID = "ID";
+  public static final String NON_IMPLEMENTED_MSG = "Not implemented yet";
+
+  public static final String ROUTER_DIR = "router";
+  public static final String META_DIR_NAME = "index_meta";
+  public static final String INDEX_DATA_DIR_NAME = "index_data";
+  public static final String STORAGE_GROUP_INDEXING_SUFFIX = ".sg_indexing";
+  public static final String STORAGE_GROUP_INDEXED_SUFFIX = ".sg_index";
+
+  public static final String INDEXING_SUFFIX = ".indexing";
+  public static final String INDEXED_SUFFIX = ".index";
+
   // whole matching
   public static final int NON_SET_TOP_K = -1;
   public static final String TOP_K = "TOP_K";
 
+  // subsequence matching: sliding window
+  public static final String INDEX_WINDOW_RANGE = "INDEX_WINDOW_RANGE";
+  public static final String INDEX_RANGE_STRATEGY = "INDEX_RANGE_STRATEGY";
+  public static final String INDEX_SLIDE_STEP = "INDEX_SLIDE_STEP";
+
+  public static final String INDEX_MAGIC = "IoTDBIndex";
+  public static final String DEFAULT_PROP_NAME = "DEFAULT";
+
+  public static final int INDEX_MAP_INIT_RESERVE_SIZE = 5;
+
   public static final String PATTERN = "PATTERN";
   public static final String THRESHOLD = "THRESHOLD";
+  public static final String BORDER = "BORDER";
+
+  // MBR Index parameters
+  public static final String FEATURE_DIM = "FEATURE_DIM";
+  public static final String DEFAULT_FEATURE_DIM = "4";
+  public static final String SEED_PICKER = "SEED_PICKER";
+  public static final String MAX_ENTRIES = "MAX_ENTRIES";
+  public static final String MIN_ENTRIES = "MIN_ENTRIES";
 
   // RTree PAA parameters
   public static final String PAA_DIM = "PAA_DIM";
+  public static final String SERIES_LENGTH = "SERIES_LENGTH";
+  public static final String DEFAULT_SERIES_LENGTH = "16";
+  public static final String DEFAULT_RTREE_PAA_DISTANCE = "2";
+
+  // Distance
+  public static final String DISTANCE = "DISTANCE";
+  public static final String L_INFINITY = "L_INFINITY";
+  public static final String DEFAULT_DISTANCE = "2";
+
+  // ELB Type
+  public static final String ELB_TYPE = "ELB_TYPE";
+  public static final String ELB_TYPE_ELE = "ELE";
+  public static final String ELB_TYPE_SEQ = "SEQ";
+  public static final String DEFAULT_ELB_TYPE = "SEQ";
+  public static final int DEFAULT_BLOCK_SIZE = 20;
 
   // ELB: calc param
   public static final String BLOCK_SIZE = "BLOCK_SIZE";
+  public static final String ELB_CALC_PARAM = "ELB_CALC_PARAM";
+  public static final String DEFAULT_ELB_CALC_PARAM = "SINGLE";
+  public static final String ELB_CALC_PARAM_SINGLE = "SINGLE";
+  public static final String ELB_THRESHOLD_BASE = "ELB_THRESHOLD_BASE";
+  public static final String ELB_THRESHOLD_RATIO = "ELB_THRESHOLD_RATIO";
+  public static final double ELB_DEFAULT_THRESHOLD_RATIO = 0.1;
+
+  public static final String MISSING_PARAM_ERROR_MESSAGE = "missing parameter: %s";
 
   private IndexConstant() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java
new file mode 100644
index 0000000..e2e67ab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IndexInfo implements Cloneable {
+
+  private Map<String, String> props;
+  private long time;
+  private IndexType indexType;
+
+  public IndexInfo(IndexType indexType, long time, Map<String, String> props) {
+    this.props = props;
+    this.time = time;
+    this.indexType = indexType;
+  }
+
+  public Map<String, String> getProps() {
+    return props;
+  }
+
+  public void setProps(Map<String, String> props) {
+    this.props = props;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public void setTime(long time) {
+    this.time = time;
+  }
+
+  public void setIndexType(IndexType indexType) {
+    this.indexType = indexType;
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
+  }
+
+  @Deprecated
+  public String serializeCreateIndex(String path) {
+    StringBuilder res = new StringBuilder();
+    res.append(
+        String.format(
+            "%s,%s,%s,%s", MetadataOperationType.CREATE_INDEX, path, indexType.serialize(), time));
+    if (props != null && !props.isEmpty()) {
+      for (Map.Entry entry : props.entrySet()) {
+        res.append(String.format(",%s=%s", entry.getKey(), entry.getValue()));
+      }
+    }
+    return res.toString();
+  }
+
+  /**
+   * @param args [0] is the MetadataType, [1] is the path, the rest is to be parsed.
+   * @return parsed IndexInfo
+   */
+  @Deprecated
+  public static IndexInfo deserializeCreateIndex(String[] args) {
+    IndexType indexType = IndexType.deserialize(Short.parseShort(args[2]));
+    long time = Long.parseLong(args[3]);
+    HashMap<String, String> indexProps = null;
+    if (args.length > 4) {
+      String[] kv;
+      indexProps = new HashMap<>(args.length - 4 + 1, 1);
+      for (int k = 4; k < args.length; k++) {
+        kv = args[k].split("=");
+        indexProps.put(kv[0], kv[1]);
+      }
+    }
+    return new IndexInfo(indexType, time, indexProps);
+  }
+
+  @Deprecated
+  public static String serializeDropIndex(String path, IndexType indexType) {
+    return String.format("%s,%s,%s", MetadataOperationType.DROP_INDEX, path, indexType.serialize());
+  }
+
+  /**
+   * @param args [0] is the MetadataType, [1] is the path, the rest is to be parsed.
+   * @return parsed IndexInfo
+   */
+  @Deprecated
+  public static IndexType deserializeDropIndex(String[] args) {
+    return IndexType.deserialize(Short.parseShort(args[2]));
+  }
+
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+    ReadWriteIOUtils.write(time, outputStream);
+    ReadWriteIOUtils.write(props, outputStream);
+  }
+
+  public static IndexInfo deserialize(InputStream inputStream) throws IOException {
+    short indexTypeShort = ReadWriteIOUtils.readShort(inputStream);
+    IndexType indexType = IndexType.deserialize(indexTypeShort);
+    long time = ReadWriteIOUtils.readLong(inputStream);
+    Map<String, String> indexProps = ReadWriteIOUtils.readMap(inputStream);
+    return new IndexInfo(indexType, time, indexProps);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("[type: %s, time: %d, props: %s]", indexType, time, props);
+  }
+
+  @Override
+  public Object clone() {
+    return new IndexInfo(indexType, time, new HashMap<>(props));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java
new file mode 100644
index 0000000..ae3a875
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class IndexProcessorStruct {
+
+  public IndexProcessor processor;
+  public PartialPath representativePath;
+  public Map<IndexType, IndexInfo> infos;
+
+  public IndexProcessorStruct(
+      IndexProcessor processor, PartialPath representativePath, Map<IndexType, IndexInfo> infos) {
+    this.processor = processor;
+    this.representativePath = representativePath;
+    this.infos = infos;
+  }
+
+  public List<StorageGroupProcessor> addMergeLock() throws StorageEngineException {
+    return StorageEngine.getInstance().mergeLock(Collections.singletonList(representativePath));
+  }
+
+  @Override
+  public String toString() {
+    return "<" + infos + "\n" + processor + ">";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
index 177e7bd..ec94158 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
@@ -19,13 +19,20 @@
 package org.apache.iotdb.db.index.common;
 
 import org.apache.iotdb.db.exception.index.UnsupportedIndexTypeException;
+import org.apache.iotdb.db.index.algorithm.IoTDBIndex;
+import org.apache.iotdb.db.index.algorithm.NoIndex;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 public enum IndexType {
   NO_INDEX,
   RTREE_PAA,
-  ELB_INDEX,
-  KV_INDEX;
+  ELB_INDEX;
 
   /**
    * judge the index type.
@@ -41,17 +48,11 @@ public enum IndexType {
         return RTREE_PAA;
       case 2:
         return ELB_INDEX;
-      case 3:
-        return KV_INDEX;
       default:
         throw new NotImplementedException("Given index is not implemented");
     }
   }
 
-  public static int getSerializedSize() {
-    return Short.BYTES;
-  }
-
   /**
    * judge the index deserialize type.
    *
@@ -65,8 +66,6 @@ public enum IndexType {
         return 1;
       case ELB_INDEX:
         return 2;
-      case KV_INDEX:
-        return 3;
       default:
         throw new NotImplementedException("Given index is not implemented");
     }
@@ -81,4 +80,41 @@ public enum IndexType {
       throw new UnsupportedIndexTypeException(indexTypeString);
     }
   }
+
+  private static IoTDBIndex newIndexByType(
+      PartialPath path,
+      TSDataType tsDataType,
+      String indexDir,
+      IndexType indexType,
+      IndexInfo indexInfo) {
+    switch (indexType) {
+      case NO_INDEX:
+        return new NoIndex(path, tsDataType, indexInfo);
+      case ELB_INDEX:
+        //        return new ELBIndex(path, tsDataType, indexDir, indexInfo);
+      case RTREE_PAA:
+        //        return new RTreePAAIndex(path, tsDataType, indexDir, indexInfo);
+      default:
+        throw new NotImplementedException("unsupported index type:" + indexType);
+    }
+  }
+
+  public static IoTDBIndex constructIndex(
+      PartialPath indexSeries,
+      TSDataType tsDataType,
+      String indexDir,
+      IndexType indexType,
+      IndexInfo indexInfo,
+      ByteBuffer previous) {
+    indexInfo.setProps(uppercaseStringProps(indexInfo.getProps()));
+    IoTDBIndex index = newIndexByType(indexSeries, tsDataType, indexDir, indexType, indexInfo);
+    index.initFeatureExtractor(previous, false);
+    return index;
+  }
+
+  private static Map<String, String> uppercaseStringProps(Map<String, String> props) {
+    Map<String, String> uppercase = new HashMap<>(props.size());
+    props.forEach((k, v) -> uppercase.put(k.toUpperCase(), v.toUpperCase()));
+    return uppercase;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
index 98809f0..f930cff 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
@@ -17,8 +17,40 @@
  */
 package org.apache.iotdb.db.index.common;
 
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
 public class IndexUtils {
 
+  /**
+   * justify whether the fullPath matches the pathWithStar. The two paths must have the same node
+   * lengths. Refer to {@code org.apache.iotdb.db.metadata.MManager#match}
+   *
+   * @param pathWithStar a path with wildcard characters.
+   * @param fullPath a full path without wildcard characters.
+   * @return true if fullPath matches pathWithStar.
+   */
+  public static boolean match(PartialPath pathWithStar, PartialPath fullPath) {
+    String[] fullNodes = fullPath.getNodes();
+    String[] starNodes = pathWithStar.getNodes();
+    if (starNodes.length != fullNodes.length) {
+      return false;
+    }
+    for (int i = 0; i < fullNodes.length; i++) {
+      if (!"*".equals(starNodes[i]) && !fullNodes[i].equals(starNodes[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   public static String removeQuotation(String v) {
     int start = 0;
     int end = v.length();
@@ -31,5 +63,48 @@ public class IndexUtils {
     return v.substring(start, end);
   }
 
+  public static File getIndexFile(String filePath) {
+    return SystemFileFactory.INSTANCE.getFile(filePath);
+  }
+
   private IndexUtils() {}
+
+  public static Map<String, Object> toLowerCaseProps(Map<String, Object> props) {
+    Map<String, Object> uppercase = new HashMap<>(props.size());
+    for (Entry<String, Object> entry : props.entrySet()) {
+      String k = entry.getKey();
+      Object v = entry.getValue();
+      if (v instanceof String) {
+        uppercase.put(k.toUpperCase(), ((String) v).toUpperCase());
+      } else {
+        uppercase.put(k.toUpperCase(), v);
+      }
+    }
+    return uppercase;
+  }
+
+  public static Object getValue(TVList srcData, int idx) {
+    switch (srcData.getDataType()) {
+      case INT32:
+        return srcData.getInt(idx);
+      case INT64:
+        return srcData.getLong(idx);
+      case FLOAT:
+        return srcData.getFloat(idx);
+      case DOUBLE:
+        return (float) srcData.getDouble(idx);
+      default:
+        throw new NotImplementedException(srcData.getDataType().toString());
+    }
+  }
+
+  /**
+   * "*" is illegal in Windows directory path. Replace it with "#"
+   *
+   * @param previousDir path which may contains "*"
+   * @return path replacing "*" with "#"
+   */
+  public static String removeIllegalStarInDir(String previousDir) {
+    return previousDir.replace('*', '#');
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java b/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java
new file mode 100644
index 0000000..3ebca7c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common.func;
+
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.Map;
+
+/**
+ * Do something without input and output.
+ *
+ * <p>This is a <a href="package-summary.html">functional interface</a> whose functional method is
+ * {@link #act(PartialPath indexSeries, Map indexInfoMap)}.
+ */
+@FunctionalInterface
+public interface CreateIndexProcessorFunc {
+
+  /** Do something. */
+  IndexProcessor act(PartialPath indexSeries, Map<IndexType, IndexInfo> indexInfoMap);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java b/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java
new file mode 100644
index 0000000..6641f60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common.func;
+
+import java.io.IOException;
+
+/**
+ * Do something without input and output.
+ *
+ * <p>This is a <a href="package-summary.html">functional interface</a> whose functional method is
+ * {@link #act()}.
+ */
+@FunctionalInterface
+public interface IndexNaiveFunc {
+
+  /** Do something. */
+  void act() throws IOException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java
new file mode 100644
index 0000000..64baed2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.common.math;
+
+import org.apache.iotdb.db.index.common.math.probability.UniformProba;
+import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.Random;
+
+public class Randomwalk {
+
+  public static TVList generateRanWalkTVList(long length, long seed, float R, float miu) {
+    TVList res = TVListAllocator.getInstance().allocate(TSDataType.DOUBLE);
+    double lastPoint = R;
+    Random r = new Random(seed);
+    UniformProba uniform = new UniformProba(miu / 2, -miu / 2, r);
+    for (int i = 0; i < length; i++) {
+      lastPoint = lastPoint + uniform.getNextRandom();
+      res.putDouble(i, lastPoint);
+    }
+    return res;
+  }
+
+  public static TVList generateRanWalkTVList(long length) {
+    return generateRanWalkTVList(length, 0, 0, 1);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
index 7b8b8cf..044f915 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
@@ -15,15 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.exception.index;
+package org.apache.iotdb.db.index.common.math.probability;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+public abstract class Probability {
 
-public class QueryIndexException extends QueryProcessException {
-
-  private static final long serialVersionUID = 9019233783504576296L;
-
-  public QueryIndexException(String message, int errorCode) {
-    super(message, errorCode);
-  }
+  public abstract double getNextRandom();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
index 7b8b8cf..909d080 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
@@ -15,15 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.exception.index;
+package org.apache.iotdb.db.index.common.math.probability;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import java.util.Random;
 
-public class QueryIndexException extends QueryProcessException {
+public class UniformProba extends Probability {
 
-  private static final long serialVersionUID = 9019233783504576296L;
+  private final double upBound;
+  private final double downBound;
+  private final double range;
+  private Random r;
 
-  public QueryIndexException(String message, int errorCode) {
-    super(message, errorCode);
+  public UniformProba(double upBound, double downBound, Random r) {
+    assert upBound > downBound;
+    this.upBound = upBound;
+    this.downBound = downBound;
+    this.range = upBound - downBound;
+    this.r = r;
+  }
+
+  @Override
+  public double getNextRandom() {
+    return r.nextDouble() * range + downBound;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java b/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java
new file mode 100644
index 0000000..d1faa1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.feature;
+
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.NON_IMPLEMENTED_MSG;
+
+/**
+ * For all indexes, the raw input sequence has to be pre-processed before it's organized by indexes.
+ * In general, index structure needn't maintain all of original data, but only pointers (e.g. the
+ * identifier [start_time, end_time, series_path] can identify a time sequence uniquely).
+ *
+ * <p>By and large, similarity index supposes the input data are ideal: fixed dimension, equal
+ * interval, not missing values and even not outliers. However, in real scenario, the input series
+ * may contain missing values (thus it's not dimension-fixed) and the point's timestamp may contain
+ * slight offset (thus they are not equal-interval). IndexFeatureExtractor need to preprocess the
+ * series and obtain clean series to insert.
+ *
+ * <p>Many indexes will further extract features of alignment sequences, such as PAA, SAX, FFT, etc.
+ *
+ * <p>In summary, the IndexFeatureExtractor can provide three-level information:
+ *
+ * <ul>
+ *   <li>L1: a triplet to identify a series: {@code {StartTime, EndTime, Length}} (not submitted in
+ *       this pr)
+ *   <li>L2: aligned sequence: {@code {a1, a2, ..., an}}
+ *   <li>L3: feature: {@code {C1, C2, ..., Cm}}
+ * </ul>
+ */
+public abstract class IndexFeatureExtractor {
+
+  /** In the BUILD and QUERY modes, the IndexFeatureExtractor may work differently. */
+  protected boolean inQueryMode;
+
+  IndexFeatureExtractor(boolean inQueryMode) {
+    this.inQueryMode = inQueryMode;
+  }
+
+  /**
+   * Input a list of new data into the FeatureProcessor.
+   *
+   * @param newData new coming data.
+   */
+  public abstract void appendNewSrcData(TVList newData);
+
+  /**
+   * Input a list of new data into the FeatureProcessor. Due to the exist codes, IoTDB generate
+   * TVList in the insert phase, but obtain BatchData in the query phase.
+   *
+   * @param newData new coming data.
+   */
+  public abstract void appendNewSrcData(BatchData newData);
+
+  /** Having done {@code appendNewSrcData}, the index framework will check {@code hasNext}. */
+  public abstract boolean hasNext();
+
+  /**
+   * If {@code hasNext} returns true, the index framework will call {@code processNext} which
+   * prepares a new series item (ready for insert).
+   */
+  public abstract void processNext();
+
+  /**
+   * After processing a batch of data, the index framework may call {@code clearProcessedSrcData} to
+   * clean out the processed data for releasing memory.
+   */
+  public abstract void clearProcessedSrcData();
+
+  /**
+   * close this extractor and release all allocated resources (TVList and PrimitiveList). It also
+   * returns the data saved for next open.
+   *
+   * @return the data saved for next open.
+   */
+  public abstract ByteBuffer closeAndRelease() throws IOException;
+
+  /** @return current L2: aligned sequence. */
+  public Object getCurrent_L2_AlignedSequence() {
+    throw new UnsupportedOperationException(NON_IMPLEMENTED_MSG);
+  }
+
+  /** @return current L3: costumed feature. */
+  public Object getCurrent_L3_Feature() {
+    throw new UnsupportedOperationException(NON_IMPLEMENTED_MSG);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java b/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java
new file mode 100644
index 0000000..8d95171
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+import java.util.Map;
+
+public class IndexQueryDataSet extends ListDataSet {
+
+  private Map<String, Integer> pathToIndex;
+
+  public IndexQueryDataSet(
+      List<PartialPath> paths, List<TSDataType> dataTypes, Map<String, Integer> pathToIndex) {
+    super(paths, dataTypes);
+    this.pathToIndex = pathToIndex;
+  }
+
+  public Map<String, Integer> getPathToIndex() {
+    return pathToIndex;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java b/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java
new file mode 100644
index 0000000..c7f07a5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.read;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.util.List;
+import java.util.Map;
+
+/** the executor for index query */
+public class QueryIndexExecutor {
+
+  private final Map<String, Object> queryProps;
+  private final IndexType indexType;
+  private final QueryContext context;
+  private final List<PartialPath> paths;
+  private final boolean alignedByTime;
+
+  public QueryIndexExecutor(QueryIndexPlan queryIndexPlan, QueryContext context) {
+    this.indexType = queryIndexPlan.getIndexType();
+    this.queryProps = IndexUtils.toLowerCaseProps(queryIndexPlan.getProps());
+    this.paths = queryIndexPlan.getPaths();
+    this.alignedByTime = queryIndexPlan.isAlignedByTime();
+    this.paths.forEach(PartialPath::toLowerCase);
+    this.context = context;
+  }
+
+  public QueryDataSet executeIndexQuery() throws StorageEngineException, QueryIndexException {
+    // get all related storage group
+    return IndexManager.getInstance()
+        .queryIndex(paths, indexType, queryProps, context, alignedByTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java
new file mode 100644
index 0000000..c9dcaee
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read.optimize;
+
+/**
+ * In the indexing mechanism, it refers to the optimization of the access order of candidate series
+ * after pruning phase. Due to the mismatch between the candidate set order given by the index and
+ * the file organization of TsFile, the access of the refinement phase advised by the index may be
+ * inefficient. We can use this query optimizerto rearrange the candidate set access order.
+ */
+public interface IIndexCandidateOrderOptimize {
+  class Factory {
+
+    private Factory() {
+      // hidden initializer
+    }
+
+    public static IIndexCandidateOrderOptimize getOptimize() {
+      return new NoCandidateOrderOptimizer();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java
new file mode 100644
index 0000000..0354bf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read.optimize;
+
+class NoCandidateOrderOptimizer implements IIndexCandidateOrderOptimize {}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java b/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java
new file mode 100644
index 0000000..bc6cfdd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Singleton pattern.
+ *
+ * <p>Firstly, IIndexRouter is responsible for index metadata management. More importantly, it is
+ * for routing the create/drop/insert/query command to corresponding index processors.
+ *
+ * <p>IIndexRouter can decouple the mapping relationship, which may be re-designed in future,
+ * between {@link org.apache.iotdb.db.index.algorithm.IoTDBIndex IoTDBIndex} and {@link
+ * IndexProcessor} from {@link org.apache.iotdb.db.index.IndexManager IndexManager}.
+ */
+public interface IIndexRouter {
+
+  /**
+   * add an new index into the router.
+   *
+   * @param prefixPath the partial path of given index series
+   * @param indexInfo the index infomation.
+   * @param func a function to create a new IndexProcessor, if it's not created before.
+   * @param doSerialize true to serialize the new information immediately.
+   * @return true if adding index information successfully
+   */
+  boolean addIndexIntoRouter(
+      PartialPath prefixPath,
+      IndexInfo indexInfo,
+      CreateIndexProcessorFunc func,
+      boolean doSerialize)
+      throws MetadataException;
+
+  /**
+   * remove an exist index into the router.
+   *
+   * @param prefixPath the partial path of given index series
+   * @param indexType the type of index to be removed.
+   * @return true if removing index information successfully
+   */
+  boolean removeIndexFromRouter(PartialPath prefixPath, IndexType indexType)
+      throws MetadataException, IOException;
+
+  Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries);
+
+  Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo();
+
+  Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries);
+
+  /**
+   * serialize all index information and processors to the disk
+   *
+   * @param doClose true if close processors after serialization.
+   */
+  void serialize(boolean doClose);
+
+  /** deserialize all index information and processors into the memory */
+  void deserializeAndReload(CreateIndexProcessorFunc func);
+
+  /**
+   * return a subset of the original IIndexRouter for accessing concurrency
+   *
+   * @param storageGroupPath the path of a storageGroup
+   * @return a subset of the original IIndexRouter
+   */
+  IIndexRouter getRouterByStorageGroup(String storageGroupPath);
+
+  int getIndexNum();
+
+  /**
+   * prepare necessary information for index query
+   *
+   * @param partialPath the query path
+   * @param indexType the index type
+   * @param context the query context
+   * @return the necessary information for this query
+   */
+  IndexProcessorStruct startQueryAndCheck(
+      PartialPath partialPath, IndexType indexType, QueryContext context)
+      throws QueryIndexException;
+
+  /**
+   * do something when the query end
+   *
+   * @param indexSeries the query path
+   * @param indexType the index type
+   * @param context the query context
+   */
+  void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context);
+
+  class Factory {
+
+    private Factory() {
+      // hidden initializer
+    }
+
+    public static IIndexRouter getIndexRouter(String routerDir) {
+      return new ProtoIndexRouter(routerDir);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java b/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java
new file mode 100644
index 0000000..0892758
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A proto implementation.
+ *
+ * <p>The subsequence-matching index is created on a single time series, while the whole-matching
+ * index is created on a group of time series with wildcards, so {@code ProtoIndexRouter} manages
+ * the two cases with different Map structures.
+ *
+ * <p>A key function of {@code IIndexRouter } is to quickly route the IndexProcessor for a given
+ * series path. If the path is full-path, it can be found as O(1) in {@code fullPathProcessorMap};
+ * Otherwise, you must traverse every key in {@code wildCardProcessorMap}.
+ */
+public class ProtoIndexRouter implements IIndexRouter {
+
+  private static final Logger logger = LoggerFactory.getLogger(ProtoIndexRouter.class);
+
+  /** for subsequence matching indexes */
+  private Map<String, IndexProcessorStruct> fullPathProcessorMap;
+  /** for whole matching indexes */
+  private Map<PartialPath, IndexProcessorStruct> wildCardProcessorMap;
+
+  private Map<String, Set<String>> sgToFullPathMap;
+  private Map<String, Set<PartialPath>> sgToWildCardPathMap;
+  private MManager mManager;
+  private File routerFile;
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private boolean unmodifiable;
+
+  ProtoIndexRouter(String routerFileDir) {
+    this(false);
+    this.routerFile = SystemFileFactory.INSTANCE.getFile(routerFileDir + File.separator + "router");
+    mManager = MManager.getInstance();
+  }
+
+  private ProtoIndexRouter(boolean unmodifiable) {
+    this.unmodifiable = unmodifiable;
+    fullPathProcessorMap = new ConcurrentHashMap<>();
+    sgToFullPathMap = new ConcurrentHashMap<>();
+    sgToWildCardPathMap = new ConcurrentHashMap<>();
+    wildCardProcessorMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void serialize(boolean closeProcessor) {
+    try (OutputStream outputStream = new FileOutputStream(routerFile)) {
+      ReadWriteIOUtils.write(fullPathProcessorMap.size(), outputStream);
+      for (Entry<String, IndexProcessorStruct> entry : fullPathProcessorMap.entrySet()) {
+        String indexSeries = entry.getKey();
+        IndexProcessorStruct v = entry.getValue();
+        ReadWriteIOUtils.write(indexSeries, outputStream);
+        ReadWriteIOUtils.write(v.infos.size(), outputStream);
+        for (Entry<IndexType, IndexInfo> e : v.infos.entrySet()) {
+          IndexInfo indexInfo = e.getValue();
+          indexInfo.serialize(outputStream);
+        }
+        if (closeProcessor && v.processor != null) {
+          v.processor.close(false);
+        }
+      }
+
+      ReadWriteIOUtils.write(wildCardProcessorMap.size(), outputStream);
+      for (Entry<PartialPath, IndexProcessorStruct> entry : wildCardProcessorMap.entrySet()) {
+        PartialPath indexSeries = entry.getKey();
+        IndexProcessorStruct v = entry.getValue();
+        ReadWriteIOUtils.write(indexSeries.getFullPath(), outputStream);
+        ReadWriteIOUtils.write(v.infos.size(), outputStream);
+        for (Entry<IndexType, IndexInfo> e : v.infos.entrySet()) {
+          IndexInfo indexInfo = e.getValue();
+          indexInfo.serialize(outputStream);
+        }
+        if (closeProcessor && v.processor != null) {
+          v.processor.close(false);
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Error when serialize router. Given up.", e);
+    }
+    if (closeProcessor) {
+      fullPathProcessorMap.clear();
+      sgToFullPathMap.clear();
+      sgToWildCardPathMap.clear();
+      wildCardProcessorMap.clear();
+    }
+  }
+
+  @Override
+  public void deserializeAndReload(CreateIndexProcessorFunc func) {
+    if (!routerFile.exists()) {
+      return;
+    }
+    try (InputStream inputStream = new FileInputStream(routerFile)) {
+      int fullSize = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < fullSize; i++) {
+        String indexSeries = ReadWriteIOUtils.readString(inputStream);
+        int indexTypeSize = ReadWriteIOUtils.readInt(inputStream);
+        for (int j = 0; j < indexTypeSize; j++) {
+          IndexInfo indexInfo = IndexInfo.deserialize(inputStream);
+          addIndexIntoRouter(new PartialPath(indexSeries), indexInfo, func, false);
+        }
+      }
+
+      int wildcardSize = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < wildcardSize; i++) {
+        String indexSeries = ReadWriteIOUtils.readString(inputStream);
+        int indexTypeSize = ReadWriteIOUtils.readInt(inputStream);
+        for (int j = 0; j < indexTypeSize; j++) {
+          IndexInfo indexInfo = IndexInfo.deserialize(inputStream);
+          addIndexIntoRouter(new PartialPath(indexSeries), indexInfo, func, false);
+        }
+      }
+    } catch (MetadataException | IOException e) {
+      logger.error("Error when deserialize router. Given up.", e);
+    }
+  }
+
+  @Override
+  public IIndexRouter getRouterByStorageGroup(String storageGroupPath) {
+    lock.readLock().lock();
+    try {
+      ProtoIndexRouter res = new ProtoIndexRouter(true);
+      if (sgToWildCardPathMap.containsKey(storageGroupPath)) {
+        for (PartialPath partialPath : sgToWildCardPathMap.get(storageGroupPath)) {
+          res.wildCardProcessorMap.put(partialPath, this.wildCardProcessorMap.get(partialPath));
+        }
+      }
+      if (sgToFullPathMap.containsKey(storageGroupPath)) {
+        for (String fullPath : sgToFullPathMap.get(storageGroupPath)) {
+          res.fullPathProcessorMap.put(fullPath, this.fullPathProcessorMap.get(fullPath));
+        }
+      }
+      return res;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public int getIndexNum() {
+    return fullPathProcessorMap.size() + wildCardProcessorMap.size();
+  }
+
+  @Override
+  public IndexProcessorStruct startQueryAndCheck(
+      PartialPath partialPath, IndexType indexType, QueryContext context)
+      throws QueryIndexException {
+    lock.readLock().lock();
+    try {
+      IndexProcessorStruct struct;
+      if (partialPath.isFullPath()) {
+        String fullPath = partialPath.getFullPath();
+        if (!fullPathProcessorMap.containsKey(fullPath)) {
+          throw new QueryIndexException("haven't create index on " + fullPath);
+        }
+        struct = fullPathProcessorMap.get(fullPath);
+      } else {
+        if (!wildCardProcessorMap.containsKey(partialPath)) {
+          throw new QueryIndexException("haven't create index on " + partialPath);
+        }
+        struct = wildCardProcessorMap.get(partialPath);
+      }
+      if (struct.processor == null) {
+        throw new QueryIndexException("Index hasn't insert any data");
+      }
+      // we don't add lock to index processor here. It doesn't matter.
+      return struct;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean addIndexIntoRouter(
+      PartialPath partialPath,
+      IndexInfo indexInfo,
+      CreateIndexProcessorFunc func,
+      boolean doSerialize)
+      throws MetadataException {
+    if (unmodifiable) {
+      throw new MetadataException("cannot add index to unmodifiable router");
+    }
+    partialPath.toLowerCase();
+    // only the pair.left (indexType map) will be updated.
+    lock.writeLock().lock();
+    IndexType indexType = indexInfo.getIndexType();
+    // record the relationship between storage group and the
+    StorageGroupMNode storageGroupMNode = mManager.getStorageGroupNodeByPath(partialPath);
+    String storageGroupPath = storageGroupMNode.getPartialPath().getFullPath();
+    // add to pathMap
+    try {
+      if (partialPath.isFullPath()) {
+        String fullPath = partialPath.getFullPath();
+        if (!fullPathProcessorMap.containsKey(fullPath)) {
+          Map<IndexType, IndexInfo> infoMap = new EnumMap<>(IndexType.class);
+          infoMap.put(indexType, indexInfo);
+          IndexProcessor processor = func.act(partialPath, infoMap);
+          fullPathProcessorMap.put(
+              fullPath, new IndexProcessorStruct(processor, partialPath, infoMap));
+        } else {
+          fullPathProcessorMap.get(fullPath).infos.put(indexType, indexInfo);
+        }
+        IndexProcessorStruct pair = fullPathProcessorMap.get(fullPath);
+        pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+
+        // add to sg
+        Set<String> indexSeriesSet = new HashSet<>();
+        Set<String> preSet = sgToFullPathMap.putIfAbsent(storageGroupPath, indexSeriesSet);
+        if (preSet != null) {
+          indexSeriesSet = preSet;
+        }
+        indexSeriesSet.add(fullPath);
+      } else {
+        if (!wildCardProcessorMap.containsKey(partialPath)) {
+          Map<IndexType, IndexInfo> infoMap = new EnumMap<>(IndexType.class);
+          infoMap.put(indexType, indexInfo);
+          IndexProcessor processor = func.act(partialPath, infoMap);
+          PartialPath representativePath = getRepresentativePath(partialPath);
+          wildCardProcessorMap.put(
+              partialPath, new IndexProcessorStruct(processor, representativePath, infoMap));
+        } else {
+          wildCardProcessorMap.get(partialPath).infos.put(indexType, indexInfo);
+        }
+        IndexProcessorStruct pair = wildCardProcessorMap.get(partialPath);
+        pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+
+        // add to sg
+        Set<PartialPath> indexSeriesSet = new HashSet<>();
+        Set<PartialPath> preSet = sgToWildCardPathMap.putIfAbsent(storageGroupPath, indexSeriesSet);
+        if (preSet != null) {
+          indexSeriesSet = preSet;
+        }
+        indexSeriesSet.add(partialPath);
+        if (doSerialize) {
+          serialize(false);
+        }
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return true;
+  }
+
+  private PartialPath getRepresentativePath(PartialPath wildcardPath) throws MetadataException {
+    Pair<List<PartialPath>, Integer> paths =
+        mManager.getAllTimeseriesPathWithAlias(wildcardPath, 1, 0);
+    if (paths.left.isEmpty()) {
+      throw new MetadataException("Please create at least one series before create index");
+    } else {
+      return paths.left.get(0);
+    }
+  }
+
+  @Override
+  public boolean removeIndexFromRouter(PartialPath partialPath, IndexType indexType)
+      throws MetadataException, IOException {
+    if (unmodifiable) {
+      throw new MetadataException("cannot remove index from unmodifiable router");
+    }
+    partialPath.toLowerCase();
+    // only the pair.left (indexType map) will be updated.
+    lock.writeLock().lock();
+    // record the relationship between storage group and the index processors
+    StorageGroupMNode storageGroupMNode = mManager.getStorageGroupNodeByPath(partialPath);
+    String storageGroupPath = storageGroupMNode.getPartialPath().getFullPath();
+
+    // remove from pathMap
+    try {
+      if (partialPath.isFullPath()) {
+        String fullPath = partialPath.getFullPath();
+        if (fullPathProcessorMap.containsKey(fullPath)) {
+          IndexProcessorStruct pair = fullPathProcessorMap.get(fullPath);
+          pair.infos.remove(indexType);
+          pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+          if (pair.infos.isEmpty()) {
+            sgToFullPathMap.get(storageGroupPath).remove(fullPath);
+            fullPathProcessorMap.remove(fullPath);
+            pair.representativePath = null;
+            pair.processor.close(true);
+            pair.processor = null;
+          }
+        }
+      } else {
+        if (wildCardProcessorMap.containsKey(partialPath)) {
+          IndexProcessorStruct pair = wildCardProcessorMap.get(partialPath);
+          pair.infos.remove(indexType);
+          pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+          if (pair.infos.isEmpty()) {
+            sgToWildCardPathMap.get(storageGroupPath).remove(partialPath);
+            wildCardProcessorMap.remove(partialPath);
+            pair.representativePath = null;
+            pair.processor.close(true);
+            pair.processor = null;
+          }
+        }
+      }
+      serialize(false);
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return true;
+  }
+
+  @Override
+  public Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries) {
+    lock.readLock().lock();
+    try {
+      if (wildCardProcessorMap.containsKey(indexSeries)) {
+        return Collections.unmodifiableMap(wildCardProcessorMap.get(indexSeries).infos);
+      }
+      if (fullPathProcessorMap.containsKey(indexSeries.getFullPath())) {
+        return Collections.unmodifiableMap(
+            fullPathProcessorMap.get(indexSeries.getFullPath()).infos);
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+    return new HashMap<>();
+  }
+
+  @Override
+  public Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo() {
+    lock.readLock().lock();
+    List<IndexProcessorStruct> res =
+        new ArrayList<>(wildCardProcessorMap.size() + fullPathProcessorMap.size());
+    try {
+      wildCardProcessorMap.forEach((k, v) -> res.add(v));
+      fullPathProcessorMap.forEach((k, v) -> res.add(v));
+    } finally {
+      lock.readLock().unlock();
+    }
+    return res;
+  }
+
+  @Override
+  public Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries) {
+    lock.readLock().lock();
+    List<IndexProcessor> res = new ArrayList<>();
+    try {
+      if (fullPathProcessorMap.containsKey(timeSeries.getFullPath())) {
+        res.add(fullPathProcessorMap.get(timeSeries.getFullPath()).processor);
+      } else {
+        wildCardProcessorMap.forEach(
+            (k, v) -> {
+              if (k.matchFullPath(timeSeries)) {
+                res.add(v.processor);
+              }
+            });
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+    return res;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    Iterable<IndexProcessorStruct> all = getAllIndexProcessorsAndInfo();
+    for (IndexProcessorStruct struct : all) {
+      sb.append(struct.toString()).append("\n");
+    }
+    return sb.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java b/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java
new file mode 100644
index 0000000..fe42852
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The data which has flushed out may be updated due to the "unsequence data" or deletion. As we
+ * cannot assume that all index techniques are able to support data deletion or update, the index
+ * framework introduces the concept of "index usability range". In the time range which is marked as
+ * "index unusable", the correctness of index's pruning phase is not guaranteed.
+ *
+ * <p>A natural solution is to put the data in the "index unusable" range into the post-processing
+ * phase (or called refinement phase) directly.
+ *
+ * <p>TODO The IIndexUsable's update due to the "merge" finishing hasn't been taken in account.
+ */
+public interface IIndexUsable {
+
+  /**
+   * add a range where index is usable.
+   *
+   * @param fullPath the path of time series
+   * @param start start timestamp
+   * @param end end timestamp
+   */
+  void addUsableRange(PartialPath fullPath, long start, long end);
+
+  /**
+   * minus a range where index is usable.
+   *
+   * @param fullPath the path of time series
+   * @param start start timestamp
+   * @param end end timestamp
+   */
+  void minusUsableRange(PartialPath fullPath, long start, long end);
+
+  /**
+   * The result format depends on "sub-matching" ({@linkplain SubMatchIndexUsability}) or
+   * "whole-matching" ({@linkplain WholeMatchIndexUsability})
+   *
+   * @return the range where index is unusable.
+   */
+  Object getUnusableRange();
+
+  void serialize(OutputStream outputStream) throws IOException;
+
+  void deserialize(InputStream inputStream) throws IllegalPathException, IOException;
+
+  class Factory {
+
+    private Factory() {
+      // hidden initializer
+    }
+
+    public static IIndexUsable createEmptyIndexUsability(PartialPath path) {
+      if (path.isFullPath()) {
+        return new SubMatchIndexUsability();
+      } else {
+        return new WholeMatchIndexUsability();
+      }
+    }
+
+    public static IIndexUsable deserializeIndexUsability(PartialPath path, InputStream inputStream)
+        throws IOException, IllegalPathException {
+      IIndexUsable res;
+      if (path.isFullPath()) {
+        res = new SubMatchIndexUsability();
+        res.deserialize(inputStream);
+      } else {
+        res = new WholeMatchIndexUsability();
+        res.deserialize(inputStream);
+      }
+      return res;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java b/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java
new file mode 100644
index 0000000..6ae41d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is to record the index usable range for a single long series, which corresponds to the
+ * subsequence matching scenario.
+ *
+ * <p>The series path in the initialized parameters must be a full path (without wildcard
+ * characters).
+ *
+ * <p>It's not thread-safe.
+ */
+public class SubMatchIndexUsability implements IIndexUsable {
+
+  private int maxSizeOfUsableSegments;
+  private final boolean initAllUsable;
+  private RangeNode unusableRanges;
+  private int size;
+
+  SubMatchIndexUsability() {
+    this(IoTDBDescriptor.getInstance().getConfig().getDefaultMaxSizeOfUnusableSegments(), true);
+  }
+
+  /**
+   * Construction
+   *
+   * @param maxSizeOfUsableSegments the max size of usable segments
+   * @param initAllUsable true if the entire time range is set to "usable".
+   */
+  SubMatchIndexUsability(int maxSizeOfUsableSegments, boolean initAllUsable) {
+    this.maxSizeOfUsableSegments = maxSizeOfUsableSegments;
+    this.initAllUsable = initAllUsable;
+    unusableRanges = new RangeNode(Long.MIN_VALUE, Long.MAX_VALUE, null);
+    size = 1;
+    if (initAllUsable) {
+      addUsableRange(null, 0, Long.MAX_VALUE - 1);
+    }
+  }
+
+  @Override
+  public void addUsableRange(PartialPath fullPath, long startTime, long endTime) {
+    // simplify the problem
+    if (startTime == Long.MIN_VALUE) {
+      startTime = startTime + 1;
+    }
+    if (endTime == Long.MAX_VALUE) {
+      endTime = endTime - 1;
+    }
+    RangeNode node = locateIdxByTime(startTime);
+    RangeNode prevNode = node;
+    while (node != null && node.start <= endTime) {
+      assert node.start <= node.end;
+      assert startTime <= endTime;
+      if (node.end < startTime) {
+        prevNode = node;
+        node = node.next;
+      } else if (startTime <= node.start && node.end <= endTime) {
+        // the unusable range is covered.
+        prevNode.next = node.next;
+        node = node.next;
+        size--;
+      } else if (node.start <= startTime && endTime <= node.end) {
+        if (node.start == startTime) {
+          // left aligned
+          node.start = endTime + 1;
+          prevNode = node;
+          node = node.next;
+        } else if (node.end == endTime) {
+          // right aligned
+          node.end = startTime - 1;
+          prevNode = node;
+          node = node.next;
+        }
+        // the unusable range is split. If it reaches the upper bound, not split it
+        else if (size < maxSizeOfUsableSegments) {
+          RangeNode newNode = new RangeNode(endTime + 1, node.end, node.next);
+          node.end = startTime - 1;
+          newNode.next = node.next;
+          node.next = newNode;
+          prevNode = newNode;
+          node = newNode.next;
+          size++;
+        } else {
+          // don't split, thus do nothing.
+          prevNode = node;
+          node = node.next;
+        }
+      } else if (startTime < node.start) {
+        node.start = endTime + 1;
+        prevNode = node;
+        node = node.next;
+      } else {
+        node.end = startTime - 1;
+        prevNode = node;
+        node = node.next;
+      }
+    }
+  }
+
+  @Override
+  public void minusUsableRange(PartialPath fullPath, long startTime, long endTime) {
+    // simplify the problem
+    if (startTime == Long.MIN_VALUE) {
+      startTime = startTime + 1;
+    }
+    if (endTime == Long.MAX_VALUE) {
+      endTime = endTime - 1;
+    }
+    RangeNode node = locateIdxByTime(startTime);
+    if (endTime <= node.end) {
+      return;
+    }
+    // add the unusable range into the list
+    RangeNode newNode;
+    if (startTime <= node.end + 1) {
+      // overlapping this node
+      node.end = endTime;
+      newNode = node;
+      node = newNode.next;
+    } else if (node.next.start <= endTime + 1) {
+      newNode = node.next;
+      if (startTime < newNode.start) {
+        newNode.start = startTime;
+      }
+      if (newNode.end < endTime) {
+        newNode.end = endTime;
+      }
+      node = newNode.next;
+    } else {
+      // non-overlap with former and latter nodes
+      if (size < maxSizeOfUsableSegments) {
+        // it doesn't overlap with latter node, new node is inserted
+        newNode = new RangeNode(startTime, endTime, node.next);
+        node.next = newNode;
+        size++;
+      } else {
+        // we cannot add more range, merge it with closer neighbor.
+        if (startTime - node.end < node.next.start - endTime) {
+          node.end = endTime;
+        } else {
+          node.next.start = startTime;
+        }
+      }
+      return;
+    }
+
+    // merge the overlapped list
+    while (node != null && node.start <= endTime + 1) {
+      if (newNode.end < node.end) {
+        newNode.end = node.end;
+      }
+      // delete the redundant node
+      newNode.next = node.next;
+      node = node.next;
+      size--;
+    }
+  }
+
+  @Override
+  public List<Filter> getUnusableRange() {
+    List<Filter> res = new ArrayList<>();
+    RangeNode p = this.unusableRanges;
+    while (p != null) {
+      res.add(toFilter(p.start, p.end));
+      p = p.next;
+    }
+    return res;
+  }
+
+  /**
+   * Find the latest node whose start time is less than {@code timestamp} A naive scanning search
+   * for the linked list. Further optimization is still going on. It returns a node and there is
+   * {@code node.start <= timestamp}
+   */
+  private RangeNode locateIdxByTime(long timestamp) {
+    if (unusableRanges.next == null) {
+      return unusableRanges;
+    }
+    RangeNode res = null;
+    RangeNode node = unusableRanges;
+    while (node.next != null) {
+      if (node.start >= timestamp) {
+        break;
+      }
+      res = node;
+      node = node.next;
+    }
+    return res;
+  }
+
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(size, outputStream);
+    ReadWriteIOUtils.write(maxSizeOfUsableSegments, outputStream);
+    RangeNode.serialize(unusableRanges, outputStream);
+  }
+
+  @Override
+  public void deserialize(InputStream inputStream) throws IOException {
+    size = ReadWriteIOUtils.readInt(inputStream);
+    maxSizeOfUsableSegments = ReadWriteIOUtils.readInt(inputStream);
+    unusableRanges = RangeNode.deserialize(inputStream);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(String.format("size:%d,", size));
+    RangeNode node = unusableRanges;
+    while (node != null) {
+      sb.append(node.toString());
+      node = node.next;
+    }
+    return sb.toString();
+  }
+
+  private static Filter toFilter(long startTime, long endTime) {
+    return FilterFactory.and(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime));
+  }
+
+  public boolean hasUnusableRange() {
+    if (initAllUsable) {
+      return !(unusableRanges.end == 0
+          && unusableRanges.next != null
+          && unusableRanges.next.start == Long.MAX_VALUE - 1
+          && unusableRanges.next.next == null);
+    } else {
+      return size > 1;
+    }
+  }
+
+  private static class RangeNode {
+
+    long start;
+    long end;
+    RangeNode next;
+
+    RangeNode(long start, long end, RangeNode next) {
+      this.start = start;
+      this.end = end;
+      this.next = next;
+    }
+
+    @Override
+    public String toString() {
+      return "["
+          + (start == Long.MIN_VALUE ? "MIN" : start)
+          + ","
+          + (end == Long.MAX_VALUE ? "MAX" : end)
+          + "],";
+    }
+
+    public static void serialize(RangeNode head, OutputStream outputStream) throws IOException {
+      int listLength = 0;
+      RangeNode tmp = head;
+      while (tmp != null) {
+        listLength++;
+        tmp = tmp.next;
+      }
+      ReadWriteIOUtils.write(listLength, outputStream);
+      while (head != null) {
+        ReadWriteIOUtils.write(head.start, outputStream);
+        ReadWriteIOUtils.write(head.end, outputStream);
+        head = head.next;
+      }
+    }
+
+    public static RangeNode deserialize(InputStream inputStream) throws IOException {
+      int listLength = ReadWriteIOUtils.readInt(inputStream);
+      RangeNode fakeHead = new RangeNode(-1, -1, null);
+      RangeNode node = fakeHead;
+      for (int i = 0; i < listLength; i++) {
+        long start = ReadWriteIOUtils.readLong(inputStream);
+        long end = ReadWriteIOUtils.readLong(inputStream);
+        RangeNode nextNode = new RangeNode(start, end, null);
+        node.next = nextNode;
+        node = nextNode;
+      }
+      return fakeHead.next;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java b/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java
new file mode 100644
index 0000000..d35bb81
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class is to record the index usable range for a list of short series, which corresponds to
+ * the whole matching scenario.
+ *
+ * <p>The series path involves wildcard characters. One series is marked as "index-usable" or
+ * "index-unusable".
+ *
+ * <p>It's not thread-safe.
+ */
+public class WholeMatchIndexUsability implements IIndexUsable {
+
+  private final Set<PartialPath> unusableSeriesSet;
+
+  WholeMatchIndexUsability() {
+    this.unusableSeriesSet = new HashSet<>();
+  }
+
+  @Override
+  public void addUsableRange(PartialPath fullPath, long start, long end) {
+    // do nothing temporarily
+  }
+
+  @Override
+  public void minusUsableRange(PartialPath fullPath, long start, long end) {
+    unusableSeriesSet.add(fullPath);
+  }
+
+  @Override
+  public Set<PartialPath> getUnusableRange() {
+    return Collections.unmodifiableSet(this.unusableSeriesSet);
+  }
+
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(unusableSeriesSet.size(), outputStream);
+    for (PartialPath s : unusableSeriesSet) {
+      ReadWriteIOUtils.write(s.getFullPath(), outputStream);
+    }
+  }
+
+  @Override
+  public void deserialize(InputStream inputStream) throws IllegalPathException, IOException {
+    int size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; i++) {
+      unusableSeriesSet.add(new PartialPath(ReadWriteIOUtils.readString(inputStream)));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
index 1f006d9..fb19c9a 100755
--- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
@@ -156,6 +156,15 @@ public class PartialPath extends Path implements Comparable<Path> {
     return true;
   }
 
+  public boolean isFullPath() {
+    for (String node : nodes) {
+      if (node.equals(IoTDBConstant.PATH_WILDCARD)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public String getFullPath() {
     if (fullPath != null) {
@@ -312,4 +321,13 @@ public class PartialPath extends Path implements Comparable<Path> {
     }
     return ret;
   }
+
+  public void toLowerCase() {
+    for (int i = 0; i < nodes.length; i++) {
+      nodes[i] = nodes[i].toLowerCase();
+    }
+    fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
+    if (measurementAlias != null) measurementAlias = measurementAlias.toLowerCase();
+    if (tsAlias != null) tsAlias = tsAlias.toLowerCase();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 54a0bf7..db9d416 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -39,11 +39,15 @@ import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.QueryIdNotExsitException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -76,6 +80,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CountPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
@@ -83,6 +88,7 @@ import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
@@ -193,6 +199,7 @@ public class PlanExecutor implements IPlanExecutor {
   protected IQueryRouter queryRouter;
   // for administration
   private IAuthorizer authorizer;
+  private IndexManager indexManager;
 
   private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
 
@@ -200,6 +207,7 @@ public class PlanExecutor implements IPlanExecutor {
     queryRouter = new QueryRouter();
     try {
       authorizer = BasicAuthorizer.getInstance();
+      indexManager = IndexManager.getInstance();
     } catch (AuthException e) {
       throw new QueryProcessException(e.getMessage());
     }
@@ -325,9 +333,9 @@ public class PlanExecutor implements IPlanExecutor {
       case STOP_TRIGGER:
         return operateStopTrigger((StopTriggerPlan) plan);
       case CREATE_INDEX:
-        throw new QueryProcessException("Create index hasn't been supported yet");
+        return createIndex((CreateIndexPlan) plan);
       case DROP_INDEX:
-        throw new QueryProcessException("Drop index hasn't been supported yet");
+        return dropIndex((DropIndexPlan) plan);
       case KILL:
         try {
           operateKillQuery((KillQueryPlan) plan);
@@ -479,7 +487,7 @@ public class PlanExecutor implements IPlanExecutor {
         GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
         queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
       } else if (queryPlan instanceof QueryIndexPlan) {
-        throw new QueryProcessException("Query index hasn't been supported yet");
+        queryDataSet = queryRouter.indexQuery((QueryIndexPlan) queryPlan, context);
       } else if (queryPlan instanceof AggregationPlan) {
         AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
         queryDataSet = queryRouter.aggregate(aggregationPlan, context);
@@ -1750,4 +1758,33 @@ public class PlanExecutor implements IPlanExecutor {
     }
     return noExistSg;
   }
+
+  private boolean createIndex(CreateIndexPlan createIndexPlan) throws QueryProcessException {
+    List<PartialPath> paths = createIndexPlan.getPaths();
+    List<PartialPath> partialPaths = new ArrayList<>(paths);
+    long startTime = createIndexPlan.getTime();
+    IndexType indexType = createIndexPlan.getIndexType();
+    Map<String, String> props = createIndexPlan.getProps();
+    IndexInfo indexInfo = new IndexInfo(indexType, startTime, props);
+    try {
+      IndexManager.getInstance().createIndex(partialPaths, indexInfo);
+    } catch (MetadataException e) {
+      throw new IndexManagerException(e);
+    }
+    return true;
+  }
+
+  private boolean dropIndex(DropIndexPlan dropIndexPlan) throws QueryProcessException {
+    List<PartialPath> paths = dropIndexPlan.getPaths();
+    List<PartialPath> partialPaths = new ArrayList<>(paths);
+    IndexType indexType = dropIndexPlan.getIndexType();
+    try {
+      IndexManager.getInstance().dropIndex(partialPaths, indexType);
+    } catch (MetadataException e) {
+      throw new IndexManagerException(e);
+    } catch (IOException e2) {
+      throw new IndexManagerException(e2.getMessage());
+    }
+    return true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 27d20f1..9410c99 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -28,6 +28,7 @@ public class QueryIndexPlan extends RawDataQueryPlan {
 
   private Map<String, Object> props;
   private IndexType indexType;
+  private boolean alignedByTime = false;
 
   public QueryIndexPlan() {
     super();
@@ -50,6 +51,14 @@ public class QueryIndexPlan extends RawDataQueryPlan {
     this.props = props;
   }
 
+  public boolean isAlignedByTime() {
+    return alignedByTime;
+  }
+
+  public void setAlignedByTime(boolean alignedByTime) {
+    this.alignedByTime = alignedByTime;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
index aef7399..989aa10 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
@@ -19,12 +19,14 @@
 package org.apache.iotdb.db.query.executor;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -65,4 +67,14 @@ public interface IQueryRouter {
   /** Execute UDTF query */
   QueryDataSet udtfQuery(UDTFPlan udtfPlan, QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException, InterruptedException;
+
+  /**
+   * Query executor
+   *
+   * @param queryPlan
+   * @param context
+   * @return
+   */
+  QueryDataSet indexQuery(QueryIndexPlan queryPlan, QueryContext context)
+      throws QueryIndexException, StorageEngineException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 3478c6c..fd67530 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -20,13 +20,16 @@
 package org.apache.iotdb.db.query.executor;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.read.QueryIndexExecutor;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -128,7 +131,7 @@ public class QueryRouter implements IQueryRouter {
 
     AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan);
 
-    QueryDataSet dataSet = null;
+    QueryDataSet dataSet;
 
     if (optimizedExpression != null
         && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
@@ -252,6 +255,13 @@ public class QueryRouter implements IQueryRouter {
     return lastQueryExecutor.execute(context, lastQueryPlan);
   }
 
+  @Override
+  public QueryDataSet indexQuery(QueryIndexPlan queryPlan, QueryContext context)
+      throws QueryIndexException, StorageEngineException {
+    QueryIndexExecutor executor = new QueryIndexExecutor(queryPlan, context);
+    return executor.executeIndexQuery();
+  }
+
   protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
     return new LastQueryExecutor(lastQueryPlan);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index c2368c4..7332945 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.index.IndexManager;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.query.control.TracingManager;
@@ -116,6 +117,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(TemporaryQueryDataFileService.getInstance());
     registerManager.register(UDFClassLoaderManager.getInstance());
     registerManager.register(UDFRegistrationService.getInstance());
+    registerManager.register(IndexManager.getInstance());
 
     registerManager.register(RPCService.getInstance());
     if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index b474b2c..0704e48 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -50,7 +50,10 @@ public enum ServiceType {
 
   FLUSH_SERVICE(
       "Flush ServerService", generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")),
-  CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor");
+  CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor"),
+
+  INDEX_SERVICE(
+      "Index ServerService", generateJmxName("org.apache.iotdb.db.index", "Index Manager"));
 
   private final String name;
   private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 952a86c..28142d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
 import org.apache.iotdb.db.qp.Planner;
@@ -58,6 +59,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
@@ -667,6 +669,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp.setIgnoreTimeStamp(false);
       }
 
+      if (plan instanceof QueryIndexPlan) {
+        resp.setColumns(
+            newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
+        resp.setDataTypeList(
+            newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+        resp.setColumnNameIndexMap(((IndexQueryDataSet) newDataSet).getPathToIndex());
+      }
       if (newDataSet instanceof DirectNonAlignDataSet) {
         resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
       } else {
@@ -847,6 +856,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
                       .getOutputDataType());
         }
         break;
+      case QUERY_INDEX:
+        // For query index, we don't know the columns before actual query.
+        // It will be deferred after obtaining query result set.
+        break;
       default:
         throw new TException("unsupported query type: " + plan.getOperatorType());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..82a521c 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
 
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -571,4 +572,32 @@ public abstract class TVList {
   public long getLastTime() {
     return getTime(size - 1);
   }
+
+  @TestOnly
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    for (int i = 0; i < size; i++) {
+      TimeValuePair pair = getTimeValuePair(i);
+      switch (getDataType()) {
+        case INT32:
+          sb.append(String.format("[%d,%d],", pair.getTimestamp(), pair.getValue().getInt()));
+          break;
+        case INT64:
+          sb.append(String.format("[%d,%d],", pair.getTimestamp(), pair.getValue().getLong()));
+          break;
+        case FLOAT:
+          sb.append(String.format("[%d,%.2f],", pair.getTimestamp(), pair.getValue().getFloat()));
+          break;
+        case DOUBLE:
+          sb.append(String.format("[%d,%.2f],", pair.getTimestamp(), pair.getValue().getDouble()));
+          break;
+        default:
+          throw new NotImplementedException(getDataType().toString());
+      }
+    }
+    sb.append("}");
+    return sb.toString();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index cea833d..6f545b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -221,7 +221,8 @@ public class TsFileRecoverPerformer {
             new MemTableFlushTask(
                 recoverMemTable,
                 restorableTsFileIOWriter,
-                tsFileResource.getTsFile().getParentFile().getParentFile().getName());
+                tsFileResource.getTsFile().getParentFile().getParentFile().getName(),
+                sequence);
         tableFlushTask.syncFlushMemTable();
         tsFileResource.updatePlanIndexes(recoverMemTable.getMinPlanIndex());
         tsFileResource.updatePlanIndexes(recoverMemTable.getMaxPlanIndex());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index a1b5d55..0576236 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -69,7 +69,8 @@ public class MemTableFlushTaskTest {
         MemTableTestUtils.deviceId0,
         MemTableTestUtils.measurementId0,
         MemTableTestUtils.dataType0);
-    MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
+    MemTableFlushTask memTableFlushTask =
+        new MemTableFlushTask(memTable, writer, storageGroup, true);
     assertTrue(
         writer
             .getVisibleMetadataList(
diff --git a/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java b/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java
new file mode 100644
index 0000000..a9fe55f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.it;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.math.Randomwalk;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.index.common.IndexType.NO_INDEX;
+import static org.junit.Assert.fail;
+
+public class NoIndexWriteIT {
+
+  private static final String insertPattern = "INSERT INTO %s(timestamp, %s) VALUES (%d, %.3f)";
+
+  private static final String storageGroupSub = "root.wind1";
+  private static final String storageGroupWhole = "root.wind2";
+
+  private static final String speed1 = "root.wind1.azq01.speed";
+  private static final String speed1Device = "root.wind1.azq01";
+  private static final String speed1Sensor = "speed";
+
+  private static final String directionDevicePattern = "root.wind2.%d";
+  private static final String directionPattern = "root.wind2.%d.direction";
+  private static final String directionSensor = "direction";
+
+  private static final String indexSub = speed1;
+  private static final String indexWhole = "root.wind2.*.direction";
+  private static final int wholeSize = 5;
+  private static final int wholeDim = 100;
+  private static final int subLength = 10_000;
+
+  @Before
+  public void setUp() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableIndex(true);
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance().getConfig().setEnableIndex(false);
+  }
+
+  @Test
+  public void checkWrite() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    //    IoTDBDescriptor.getInstance().getConfig().setEnableIndex(false);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement(); ) {
+      long start = System.currentTimeMillis();
+      statement.execute(String.format("SET STORAGE GROUP TO %s", storageGroupSub));
+      statement.execute(String.format("SET STORAGE GROUP TO %s", storageGroupWhole));
+
+      System.out.println(
+          String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
+      statement.execute(
+          String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
+
+      for (int i = 0; i < wholeSize; i++) {
+        String wholePath = String.format(directionPattern, i);
+        statement.execute(
+            String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", wholePath));
+      }
+      long startCreateIndex = System.currentTimeMillis();
+      statement.execute(String.format("CREATE INDEX ON %s WITH INDEX=%s", indexSub, NO_INDEX));
+      statement.execute(String.format("CREATE INDEX ON %s WITH INDEX=%s", indexWhole, NO_INDEX));
+
+      System.out.println(IndexManager.getInstance().getRouter());
+      Assert.assertEquals(
+          "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+              + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+          IndexManager.getInstance().getRouter().toString());
+      TVList subInput = Randomwalk.generateRanWalkTVList(subLength);
+      long startInsertSub = System.currentTimeMillis();
+      for (int i = 0; i < subInput.size(); i++) {
+        statement.execute(
+            String.format(
+                insertPattern,
+                speed1Device,
+                speed1Sensor,
+                subInput.getTime(i),
+                subInput.getDouble(i)));
+      }
+      statement.execute("flush");
+      System.out.println("insert finish for subsequence case");
+      System.out.println(IndexManager.getInstance().getRouter());
+      Assert.assertEquals(
+          "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+              + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+          IndexManager.getInstance().getRouter().toString());
+
+      long startInsertWhole = System.currentTimeMillis();
+      TVList wholeInput = Randomwalk.generateRanWalkTVList(wholeDim * wholeSize);
+      for (int i = 0; i < wholeSize; i++) {
+        String device = String.format(directionDevicePattern, i);
+        statement.execute(
+            String.format(
+                insertPattern,
+                device,
+                directionSensor,
+                wholeInput.getTime(i),
+                wholeInput.getDouble(i)));
+      }
+      statement.execute("flush");
+      long end = System.currentTimeMillis();
+      System.out.println(IndexManager.getInstance().getRouter());
+      Assert.assertEquals(
+          "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+              + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+          IndexManager.getInstance().getRouter().toString());
+      System.out.println("insert finish for subsequence case");
+      System.out.println(String.format("create series use %d ms", (startCreateIndex - start)));
+      System.out.println(
+          String.format("create index  use %d ms", (startInsertSub - startCreateIndex)));
+      System.out.println(
+          String.format("insert sub    use %d ms", (startInsertWhole - startInsertSub)));
+      System.out.println(String.format("insert whole  use %d ms", (end - startInsertWhole)));
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java b/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java
new file mode 100644
index 0000000..69af9e9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_SLIDE_STEP;
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_WINDOW_RANGE;
+import static org.apache.iotdb.db.index.common.IndexConstant.PAA_DIM;
+import static org.apache.iotdb.db.index.common.IndexType.NO_INDEX;
+
+public class ProtoIndexRouterTest {
+
+  private static final String storageGroupSub = "root.wind1";
+  private static final String storageGroupFull = "root.wind2";
+  private static final String speed1 = "root.wind1.azq01.speed";
+  private static final String direction1 = "root.wind2.1.direction";
+  private static final String direction2 = "root.wind2.2.direction";
+  private static final String direction3 = "root.wind2.3.direction";
+  private static final String index_sub = speed1;
+  private static final String index_full = "root.wind2.*.direction";
+  private IndexInfo infoFull;
+  private IndexInfo infoSub;
+  private CreateIndexProcessorFunc fakeCreateFunc;
+
+  private void prepareMManager() throws MetadataException {
+    MManager mManager = MManager.getInstance();
+    mManager.init();
+    mManager.setStorageGroup(new PartialPath(storageGroupSub));
+    mManager.setStorageGroup(new PartialPath(storageGroupFull));
+    mManager.createTimeseries(
+        new PartialPath(speed1),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    mManager.createTimeseries(
+        new PartialPath(direction1),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    mManager.createTimeseries(
+        new PartialPath(direction2),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    mManager.createTimeseries(
+        new PartialPath(direction3),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    Map<String, String> props_sub = new HashMap<>();
+    props_sub.put(INDEX_WINDOW_RANGE, "4");
+    props_sub.put(INDEX_SLIDE_STEP, "8");
+
+    Map<String, String> props_full = new HashMap<>();
+    props_full.put(PAA_DIM, "5");
+    this.infoSub = new IndexInfo(NO_INDEX, 0, props_sub);
+    this.infoFull = new IndexInfo(NO_INDEX, 5, props_full);
+    this.fakeCreateFunc =
+        (indexSeries, indexInfoMap) ->
+            new IndexProcessor(
+                indexSeries, testRouterDir + File.separator + "index_fake_" + indexSeries);
+  }
+
+  private static final String testRouterDir = "test_protoIndexRouter";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    if (!new File(testRouterDir).exists()) {
+      new File(testRouterDir).mkdirs();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    if (new File(testRouterDir).exists()) {
+      FileUtils.deleteDirectory(new File(testRouterDir));
+    }
+  }
+
+  @Test
+  public void testBasic() throws MetadataException, IOException {
+    prepareMManager();
+    ProtoIndexRouter router = new ProtoIndexRouter(testRouterDir);
+
+    router.addIndexIntoRouter(new PartialPath(index_sub), infoSub, fakeCreateFunc, true);
+    router.addIndexIntoRouter(new PartialPath(index_full), infoFull, fakeCreateFunc, true);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+            + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        router.toString());
+    // select by storage group
+    ProtoIndexRouter sgRouters =
+        (ProtoIndexRouter) router.getRouterByStorageGroup(storageGroupFull);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n",
+        sgRouters.toString());
+
+    // serialize
+    router.serialize(false);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+            + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        router.toString());
+    router.serialize(true);
+    Assert.assertEquals("", router.toString());
+
+    IIndexRouter newRouter = new ProtoIndexRouter(testRouterDir);
+    newRouter.deserializeAndReload(fakeCreateFunc);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+            + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        newRouter.toString());
+
+    // delete index
+    newRouter.removeIndexFromRouter(new PartialPath(index_full), NO_INDEX);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        newRouter.toString());
+    newRouter.removeIndexFromRouter(new PartialPath(index_full), NO_INDEX);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        newRouter.toString());
+
+    newRouter.removeIndexFromRouter(new PartialPath(index_sub), NO_INDEX);
+    Assert.assertEquals("", newRouter.toString());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java b/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java
new file mode 100644
index 0000000..b177a14
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class SubMatchIndexUsabilityTest {
+
+  @Test
+  public void addUsableRange1() throws IOException {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+    // split range
+    usability.addUsableRange(null, 1, 9);
+    Assert.assertEquals("size:2,[MIN,0],[10,MAX],", usability.toString());
+    System.out.println(usability);
+    usability.addUsableRange(null, 21, 29);
+    Assert.assertEquals("size:3,[MIN,0],[10,20],[30,MAX],", usability.toString());
+    System.out.println(usability);
+    usability.addUsableRange(null, 41, 49);
+    Assert.assertEquals("size:4,[MIN,0],[10,20],[30,40],[50,MAX],", usability.toString());
+    System.out.println(usability);
+    // cover a range
+    usability.addUsableRange(null, 29, 45);
+    Assert.assertEquals("size:3,[MIN,0],[10,20],[50,MAX],", usability.toString());
+    System.out.println(usability);
+
+    // split a range
+    usability.addUsableRange(null, 14, 17);
+    Assert.assertEquals("size:4,[MIN,0],[10,13],[18,20],[50,MAX],", usability.toString());
+    System.out.println(usability);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+    usability2.deserialize(in);
+    Assert.assertEquals("size:4,[MIN,0],[10,13],[18,20],[50,MAX],", usability2.toString());
+  }
+
+  @Test
+  public void addUsableRange2() throws IOException {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+    usability.addUsableRange(null, 1, 19);
+    usability.addUsableRange(null, 51, 59);
+    usability.addUsableRange(null, 81, 99);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,0],[20,50],[60,80],[100,MAX],", usability.toString());
+    // left cover
+    usability.addUsableRange(null, 10, 29);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,0],[30,50],[60,80],[100,MAX],", usability.toString());
+    // right cover
+    usability.addUsableRange(null, 71, 99);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,0],[30,50],[60,70],[100,MAX],", usability.toString());
+    // left cover multiple
+    usability.addUsableRange(null, 20, 80);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,0],[100,MAX],", usability.toString());
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+    usability2.deserialize(in);
+    Assert.assertEquals("size:2,[MIN,0],[100,MAX],", usability2.toString());
+  }
+
+  @Test
+  public void minusUsableRange() throws IOException {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+
+    // merge with MIN, MAX
+    usability.minusUsableRange(null, 1, 19);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+
+    // new range is covered by the first range
+    usability.addUsableRange(null, 51, 89);
+    usability.minusUsableRange(null, 1, 19);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,50],[90,MAX],", usability.toString());
+
+    usability.addUsableRange(null, 101, 259);
+    // new range extend node's end time
+    usability.minusUsableRange(null, 51, 60);
+    System.out.println(usability);
+    Assert.assertEquals("size:3,[MIN,60],[90,100],[260,MAX],", usability.toString());
+
+    // new range extend node's start time
+    usability.minusUsableRange(null, 80, 89);
+    System.out.println(usability);
+    Assert.assertEquals("size:3,[MIN,60],[80,100],[260,MAX],", usability.toString());
+
+    // new range is inserted as a individual node [120, 140]
+    usability.minusUsableRange(null, 120, 140);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[120,140],[260,MAX],", usability.toString());
+
+    // re-insert: new range is totally same as an exist node
+    usability.minusUsableRange(null, 120, 140);
+    System.out.println(usability);
+    Assert.assertTrue(usability.hasUnusableRange());
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[120,140],[260,MAX],", usability.toString());
+
+    // re-insert: new range extend the both sides of an exist node
+    usability.minusUsableRange(null, 110, 150);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[110,150],[260,MAX],", usability.toString());
+
+    // an isolate range but the segmentation number reaches the upper bound, thus merge the range
+    // with a closer neighbor.
+    usability.minusUsableRange(null, 200, 220);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[110,150],[200,MAX],", usability.toString());
+
+    // a range covers several node.
+    usability.minusUsableRange(null, 50, 90);
+    System.out.println(usability);
+    Assert.assertEquals("size:3,[MIN,100],[110,150],[200,MAX],", usability.toString());
+
+    // a range covers several node.
+    usability.minusUsableRange(null, 105, 200);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,100],[105,MAX],", usability.toString());
+    Assert.assertTrue(usability.hasUnusableRange());
+    // the end
+    usability.minusUsableRange(null, 101, 107);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+    Assert.assertFalse(usability.hasUnusableRange());
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+    usability2.deserialize(in);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability2.toString());
+  }
+
+  @Test
+  public void subMatchingToFilter() {
+    SubMatchIndexUsability a = new SubMatchIndexUsability(10, false);
+    Assert.assertFalse(a.hasUnusableRange());
+    a.addUsableRange(null, 31, 39);
+    a.addUsableRange(null, 61, 89);
+    System.out.println(a);
+
+    Assert.assertTrue(a.hasUnusableRange());
+    List<Filter> res = a.getUnusableRange();
+    Assert.assertEquals(3, res.size());
+    System.out.println(res);
+    Assert.assertEquals(
+        "[(time >= -9223372036854775808 && time <= 30), (time >= 40 && time <= 60), (time >= 90 && time <= 9223372036854775807)]",
+        res.toString());
+  }
+
+  @Test
+  public void minusUsableRange2() {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+    Assert.assertFalse(usability.hasUnusableRange());
+    usability.addUsableRange(null, 0, 15);
+    usability.minusUsableRange(null, 20, 24);
+    Assert.assertEquals("size:2,[MIN,-1],[16,MAX],", usability.toString());
+    Assert.assertTrue(usability.hasUnusableRange());
+    System.out.println(usability.toString());
+  }
+
+  @Test
+  public void reachUpBoundAndStopSplit() {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(2, false);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+
+    usability.addUsableRange(null, 5, 7);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[8,MAX],", usability.toString());
+    usability.addUsableRange(null, 2, 3);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[8,MAX],", usability.toString());
+    usability.minusUsableRange(null, 6, 6);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[6,MAX],", usability.toString());
+    usability.addUsableRange(null, 6, 9);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[10,MAX],", usability.toString());
+    usability.minusUsableRange(null, 6, 6);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,6],[10,MAX],", usability.toString());
+    usability.addUsableRange(null, 2, 6);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,1],[10,MAX],", usability.toString());
+  }
+
+  @Test
+  public void testBoundCase() {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(2, true);
+    usability.addUsableRange(null, Long.MIN_VALUE, Long.MAX_VALUE);
+    System.out.println(usability);
+    Assert.assertEquals(
+        "size:2,[MIN,-9223372036854775808],[9223372036854775807,MAX],", usability.toString());
+    usability.minusUsableRange(null, Long.MIN_VALUE, Long.MAX_VALUE);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+    Assert.assertTrue(usability.hasUnusableRange());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java b/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java
new file mode 100644
index 0000000..edf49fb
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+public class WholeMatchIndexUsabilityTest {
+
+  @Test
+  public void testMinusUsableRange() throws IllegalPathException, IOException {
+    WholeMatchIndexUsability usability = new WholeMatchIndexUsability();
+    // do nothing for addUsableRange
+    usability.addUsableRange(new PartialPath("root.sg.d.s10"), 1, 2);
+    usability.addUsableRange(new PartialPath("root.sg.d.s11"), 1, 2);
+
+    usability.minusUsableRange(new PartialPath("root.sg.d.s1"), 1, 2);
+    usability.minusUsableRange(new PartialPath("root.sg.d.s2"), 1, 2);
+    usability.minusUsableRange(new PartialPath("root.sg.d.s3"), 1, 2);
+    Set<PartialPath> ret = usability.getUnusableRange();
+    Assert.assertEquals("[root.sg.d.s3, root.sg.d.s2, root.sg.d.s1]", ret.toString());
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    WholeMatchIndexUsability usable2 = new WholeMatchIndexUsability();
+    usable2.deserialize(in);
+    Set<PartialPath> ret2 = usability.getUnusableRange();
+    Assert.assertEquals("[root.sg.d.s3, root.sg.d.s2, root.sg.d.s1]", ret2.toString());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index ab7ddf3..dd36009 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.index.IndexManager;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -121,6 +122,11 @@ public class EnvironmentUtils {
       fail();
     }
 
+    // clean index
+    if (config.isEnableIndex()) {
+      IndexManager.getInstance().deleteAll();
+    }
+
     IoTDBDescriptor.getInstance().getConfig().setReadOnly(false);
 
     // clean cache
@@ -211,6 +217,8 @@ public class EnvironmentUtils {
     cleanDir(config.getQueryDir());
     // delete tracing
     cleanDir(config.getTracingDir());
+    // delete index
+    cleanDir(config.getIndexRootFolder());
     // delete data files
     for (String dataDir : config.getDataDirs()) {
       cleanDir(dataDir);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f2fd3c1..a90dcad 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -65,6 +65,7 @@ public enum TSStatusCode {
 
   UNSUPPORTED_INDEX_FUNC_ERROR(421),
   UNSUPPORTED_INDEX_TYPE_ERROR(422),
+  INDEX_QUERY_ERROR(423),
 
   INTERNAL_SERVER_ERROR(500),
   CLOSE_OPERATION_ERROR(501),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 014d45a..02f1d16 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -128,6 +128,30 @@ public class ReadWriteIOUtils {
     }
   }
 
+  public static int write(Map<String, String> map, OutputStream stream) throws IOException {
+    int length = 0;
+    write(map.size(), stream);
+    length += 4;
+    for (Entry<String, String> entry : map.entrySet()) {
+      length += write(entry.getKey(), stream);
+      length += write(entry.getValue(), stream);
+    }
+    return length;
+  }
+
+  public static Map<String, String> readMap(InputStream inputStream) throws IOException {
+    int length = readInt(inputStream);
+    Map<String, String> map = new HashMap<>(length);
+    for (int i = 0; i < length; i++) {
+      // key
+      String key = readString(inputStream);
+      // value
+      String value = readString(inputStream);
+      map.put(key, value);
+    }
+    return map;
+  }
+
   public static int write(Map<String, String> map, ByteBuffer buffer) {
     int length = 0;
     byte[] bytes;
@@ -701,6 +725,29 @@ public class ReadWriteIOUtils {
     return bytes;
   }
 
+  /**
+   * read bytes from an inputStream where the length is specified at the head of the inputStream.
+   * Make sure {@code inputStream} contains an integer numeric (the first 4 bytes) indicating the
+   * length of the following data.
+   *
+   * @param inputStream contains a length and a stream
+   * @return bytebuffer
+   * @throws IOException if the read length doesn't equal to the self description length.
+   */
+  public static ByteBuffer readByteBufferWithSelfDescriptionLength(InputStream inputStream)
+      throws IOException {
+    int length = readInt(inputStream);
+    byte[] bytes = new byte[length];
+    int readLen = inputStream.read(bytes);
+    if (readLen != length) {
+      throw new IOException(String.format(RETURN_ERROR, length, readLen));
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(length);
+    byteBuffer.put(bytes);
+    byteBuffer.flip();
+    return byteBuffer;
+  }
+
   /** read bytes from buffer with offset position to the end of buffer. */
   public static int readAsPossible(TsFileInput input, long position, ByteBuffer buffer)
       throws IOException {