You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2021/03/02 11:01:49 UTC

[iotdb] branch index_manager_part2 created (now b0639d5)

This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a change to branch index_manager_part2
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at b0639d5  Index framework: part 2

This branch includes the following new commits:

     new b0639d5  Index framework: part 2

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Index framework: part 2

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch index_manager_part2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b0639d57bc4e6901fd482e36c0a3a71c7e2387e9
Author: kr11 <3095717866.com>
AuthorDate: Tue Mar 2 19:00:04 2021 +0800

    Index framework: part 2
---
 docs/zh/SystemDesign/Index/Index.md                | 672 +++++++++++++++++++++
 .../resources/conf/iotdb-engine.properties         |   6 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  33 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  12 +
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  34 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   2 +-
 .../db/exception/index/QueryIndexException.java    |   5 +
 ...ion.java => UnsupportedIndexFuncException.java} |  10 +-
 .../iotdb/db/index/IndexBuildTaskPoolManager.java  |  78 +++
 .../org/apache/iotdb/db/index/IndexManager.java    | 329 ++++++++++
 .../apache/iotdb/db/index/IndexManagerMBean.java   |  21 +
 .../iotdb/db/index/IndexMemTableFlushTask.java     |  75 +++
 .../org/apache/iotdb/db/index/IndexProcessor.java  | 514 ++++++++++++++++
 .../iotdb/db/index/algorithm/IoTDBIndex.java       | 182 ++++++
 .../apache/iotdb/db/index/algorithm/NoIndex.java   |  68 +++
 .../apache/iotdb/db/index/common/DistSeries.java   |  39 ++
 .../iotdb/db/index/common/IndexConstant.java       |  54 ++
 .../apache/iotdb/db/index/common/IndexInfo.java    | 137 +++++
 .../db/index/common/IndexProcessorStruct.java      |  52 ++
 .../apache/iotdb/db/index/common/IndexType.java    |  56 +-
 .../apache/iotdb/db/index/common/IndexUtils.java   |  75 +++
 .../common/func/CreateIndexProcessorFunc.java      |  39 ++
 .../iotdb/db/index/common/func/IndexNaiveFunc.java |  34 ++
 .../iotdb/db/index/common/math/Randomwalk.java     |  44 ++
 .../common/math/probability/Probability.java}      |  12 +-
 .../common/math/probability/UniformProba.java}     |  24 +-
 .../db/index/feature/IndexFeatureExtractor.java    | 106 ++++
 .../iotdb/db/index/read/IndexQueryDataSet.java     |  41 ++
 .../iotdb/db/index/read/QueryIndexExecutor.java    |  56 ++
 .../optimize/IIndexCandidateOrderOptimize.java     |  38 ++
 .../read/optimize/NoCandidateOrderOptimizer.java   |  21 +
 .../apache/iotdb/db/index/router/IIndexRouter.java | 129 ++++
 .../iotdb/db/index/router/ProtoIndexRouter.java    | 424 +++++++++++++
 .../apache/iotdb/db/index/usable/IIndexUsable.java |  98 +++
 .../db/index/usable/SubMatchIndexUsability.java    | 311 ++++++++++
 .../db/index/usable/WholeMatchIndexUsability.java  |  79 +++
 .../org/apache/iotdb/db/metadata/PartialPath.java  |  18 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  43 +-
 .../iotdb/db/qp/physical/crud/QueryIndexPlan.java  |   9 +
 .../iotdb/db/query/executor/IQueryRouter.java      |  12 +
 .../iotdb/db/query/executor/QueryRouter.java       |  12 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   5 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  13 +
 .../iotdb/db/utils/datastructure/TVList.java       |  29 +
 .../writelog/recover/TsFileRecoverPerformer.java   |   3 +-
 .../db/engine/memtable/MemTableFlushTaskTest.java  |   3 +-
 .../apache/iotdb/db/index/it/NoIndexWriteIT.java   | 162 +++++
 .../db/index/router/ProtoIndexRouterTest.java      | 179 ++++++
 .../index/usable/SubMatchIndexUsabilityTest.java   | 233 +++++++
 .../index/usable/WholeMatchIndexUsabilityTest.java |  55 ++
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   8 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  47 ++
 54 files changed, 4700 insertions(+), 44 deletions(-)

diff --git a/docs/zh/SystemDesign/Index/Index.md b/docs/zh/SystemDesign/Index/Index.md
new file mode 100644
index 0000000..cf0fba9
--- /dev/null
+++ b/docs/zh/SystemDesign/Index/Index.md
@@ -0,0 +1,672 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# 相似性索引框架文档
+
+[TOC]
+
+## 第1章 任务目标
+
+### 背景
+
+在最近的二十年间,谷歌一系列重要工作的发表 (GFS ,BigTable ,Map-Reduce 等),以及相应的开源框架的兴起 (Hadoop,HBase 等) 推动了大数据技术的兴起和蓬勃发展。以BigTable,HBase,Cassandra 为代表的NoSQL数据库系统对新一代数据库技术产生了巨大的影响。在这一背景下,为了管理气象、医疗、工业物联网等领域产生的大量时间序列数据,时序数据库系统也得到了快速发展 。
+
+然而,现有数据库系统通常仅支持经典查询(点查询,范围查询和带值过滤条件的查询),常见的聚合查询(最大值,最小值,均值,求和等)和少量领域相关操作(如按时间间隔聚合,按时间戳对齐等)等等。
+在传统的关系型数据库系统中,如Oracle和MySQL,往往只集成了几种被广泛使用的索引技术,如用于加速范围查询的B树和加速等值查询的哈希表。很多当前主流的时序数据库系统(如InfluxDB,OpenTSDB、Apache IoTDB)仍不支持时间序列的相似性索引。InfluxDB提出了时间序列索引(Time Series Index,TSI)。但主要用于对海量时间序列的路径、标签等元信息的快速查找。在机器学习领域,开发者提出了许多索引代码库,但主要面向近似检索,同时并未与时序数据库系统进行集成。基于时序数据库系统的索引机制仍然是有待解决的问题。
+
+针对时序数据的相似性索引一直是非常重要的研究课题。研究者们提出了一系列索引结构,致力于解决全序列检索和子序列检索问题。然而,研究者提出的索引技术通常以独立项目的形式存在。在应用于生产场景时,需要全面考虑数据采集/存储、索引构建/查询接口、数据更新、数据容灾等问题。时序数据库系统具有规范化的查询语言、丰富而健壮的读写接口、高效的数据读写性能以及较为完善的容灾策略。将相似性索引技术集成到时序数据库系统中,不仅丰富了数据库系统的功能,同时也会简化索引在面对复杂应用场景时的适配工作,提升索引的易用性,有助于索引技术在生产实践中落地。然而,数据库系统是复杂的,与数据库系统的对接必须综合考虑SQL定义、执行计划、读写流程以及返回格式定义等技术细节。这些复杂的问题制约了索引与�
 �据库系统的深度集成。
+
+
+
+#### 目标
+
+使IoTDB支持时间序列相似性检索功能,并集成包括复合模式索引、R树索引在内的多种索引技术。
+
+## 详细设计
+
+### 基本概念
+
+#### 什么是“相似性检索”
+
+相似性搜索是时间序列领域中最重要的方向之一。与IoTDB中现有的查询条件不同,相似性搜索输入一条“序列”作为查询,目的是高效地从数据库中找到一系列与“查询序列”相似的其他。
+
+所谓“相似”:两条序列之间的“相似性”取决于它们的“欧氏距离”或其他距离函数。
+
+如下图形象地展示了如何用“欧氏距离(Euclidean)”和“动态时间规整距离(DTW)”度量两条时间序列的相似性:
+
+![image-20210225015048345](https://user-images.githubusercontent.com/5548915/109539640-849a8b00-7afc-11eb-8c95-b666d6dac2ca.png)
+
+
+
+#### 什么是“特征索引”
+
+在关系型数据库中,一些经典的索引结构,如B树和哈希表,都是对关系表中的属性创建索引。而相似性索引的检索对象是高维序列。由于高维序列存在维度灾难问题,相似性索引需要首先将高维序列转换成低维特征,然后对低维特征创建索引。如果将关系型数据库系统中的B树和哈希表称为“基于属性的索引”,那么时序数据库系统中的相似性索引可以被视为“基于特征的索引”。
+
+数据库系统中的特征索引从逻辑上可以被划分为三层结构,自下而上依次是数据层、特征层和索引层:
+
+ * 数据层:对应着数据空间中的原始序列。 在IoTDB中,系统接收大量传感器数据并将其组织成TsFile文件格式写入磁盘;
+ * 特征层:对应着特征空间中的低维特征。 每条特征的大小通常远小于原始时间序列。 一条特征记录包含两部分信息:特征向量和指向数据文件的指针;
+ * 索引层:对应着索引结构。通过额外的内存或磁盘结构,索引结构可以对特征进行有效的组织。索引结构的大小通常远小于由它组织的特征和原始序列。在索引结构底层的索引项中包含了一个或多个指向特征的指针;
+
+在构造索引时,索引机制首先将数据层中的原始序列转换为特征,然后写入索引结构中。在执行查询时,索引机制将查询序列转换为特征,然后输入索引结构。通过索引结构和特征信息对被检索集进行剪枝,得到候选集,候选集中包含了一系列指向原始数据的指针;然后访问候选集所对应的原始数据,得到最终的查询结果。
+
+###  功能定义
+
+索引框架将满足两类使用者的需求:
+
+* 对数据库管理员和用户来说,索引机制设计了**索引创建、查询和删除的SQL语句**,并设计了与IoTDB其他查询相一致的返回结果格式。
+* 对索引实现者来说,索引机制暴露出一套与索引技术密切相关的接口,包括特征选型、索引插入和索引过滤等等。索引实现者可以在不必关心数据库系统技术细节的情况下,便捷地集成各种索引技术。
+
+#### 用户功能:面向用户和DBA
+
+> 注:面向用户和DBA的代码主要涉及SQL和查询计划的改动,这部分代码已经合并到主分支中,参见[PR-2024(IoTDB-804)](https://github.com/apache/iotdb/pull/2024)。
+
+本节将从两个工业场景中的示例出发,阐述两种检索场景(全序列检索和子序列检索)的使用方式。
+
+##### 全序列检索
+
+![image](https://user-images.githubusercontent.com/5548915/109540349-4782c880-7afd-11eb-9c67-ae83697f77d3.png)
+
+上图展示了制药工厂在IoTDB中的数据库模式。为了简洁清晰,图中省略了部分与本节无关的信息。工厂包含多个发酵罐(Ferm01、Ferm02等等),每个发酵罐逐批次地进行红霉素发酵工作,每个批次持续时间为7天。对于每个批次(如Ferm02发酵罐的20191017批次),工厂会对发酵过程中的物理量进行记录,包括葡萄糖补糖率(Glu)、二氧化碳释放率(CER)、酸碱度(pH)等等。
+
+分析人员希望为所有发酵罐、所有批次的补糖率序列(Glu)创建基于“PAA特征的R树索引”(记为RTREE_PAA)。创建索引格式如下:
+
+```sql
+CREATE INDEX ON seriesPath
+WITH INDEX=indexName (, key=value)* 
+```
+
+除了必须指定索引名之外,还需要传入一系列创建索引所需的参数。在本案例中,创建索引语句为:
+
+```sql
+CREATE INDEX ON root.Ery.*.Glu 
+WITH INDEX=RTREE_PAA, PAA_DIM=8
+```
+
+索引类别为RTREE_PAA,即基于PAA特征的R树索引:将时间序列转换为8维PAA特征,然后交由R树组织。
+
+创建索引语句不限制索引参数的数量和格式,以此保证良好的可拓展性。用户提交的参数将由索引注册表记录。当索引实例被加载到内存中时,索引机制会将参数信息传给索引实例。
+
+分析人员希望在所有发酵罐的所有批次中,寻找与某种“补糖策略”最接近的2条补糖率曲线,查询语句如下:
+
+```sql
+SELECT TOP 2 Glu 
+FROM root.Ery.* 
+WHERE Glu LIKE (0, 120, 20, 80, ..., 120, 100, 80, 0)
+```
+
+通过关键字LIKE,IoTDB的查询处理器将这条查询语句交给索引机制执行。
+为了与IoTDB的经典查询结果格式保持一致,全序列检索的每一条结果单独成列,并按照时间戳顺序对齐。如下图所示,与给定查询序列最接近的两条序列分别是发酵罐Ferm01的20191018批次和发酵罐Ferm02的20191024批次。两条序列按照时间戳对齐。
+
+![image](https://user-images.githubusercontent.com/5548915/109540059-f70b6b00-7afc-11eb-938e-d921a9a5d667.png)
+
+索引机制允许用户删除一个被创建的索引,删除索引格式如下:
+
+```sql
+DROP INDEX indexName ON seriesPath
+```
+
+删除索引的格式非常简单。在本例中,分析人员通过以下语句删除索引:
+
+```sql
+DROP INDEX RTREE_PAA ON root.Ery.*.Glu 
+```
+
+##### 子序列检索
+
+![image](https://user-images.githubusercontent.com/5548915/109540439-65502d80-7afd-11eb-87d6-cf9b7e46ecfd.png)
+
+上图展示了风力发电厂在IoTDB中的数据库模式。风电厂包含多个风机(AZQ01、AZQ02等等),每个风机对风机状态和周围环境进行持续监测。监测传感器包括风速(Speed)、风向(Direction)、风机功率(Power)等等。
+
+分析人员希望为风机AZQ02的风速传感器序列(Speed)创建“等长数据块索引”。创建索引语句为:
+
+```sql
+CREATE INDEX ON root.Wind.AZQ02.Speed
+WITH INDEX=ELB_INDEX, Block_Size=5
+```
+
+创建索引后,分析人员希望在AZQ02风机的风速序列中寻找与"极端运行阵风"相似的子序列。其中极端运行阵风为复合模式,在风速高峰期阈值要求小于4,在其他区间段阈值要求小于2。查询语句如下:
+
+```sql
+SELECT Speed.* FROM root.Wind.AZQ02
+WHERE Speed 
+CONTAIN (15, 14, 12, ..., 12, 12, 11) WITH TOLERANCE 2
+CONCAT  (10, 20, 25, ..., 24, 14, 8) WITH TOLERANCE 4
+CONCAT  (8, 9, 10, ..., 14, 15, 15) WITH TOLERANCE 2
+```
+
+通过 CONTAIN 和 WITH TOLERANCE等关键字,IoTDB的查询处理器将这条查询语句交给索引机制执行,索引机制进一步调用相应的ELB索引。
+
+子序列检索的结果是长序列上的子片段,原本并不存在于数据库模式中。为了与IoTDB的其他查询结果格式保持一致,每一个检索结果单独成列,列名由“传感器序列名”和“子片段起始时间”连缀而成。如下图所示,复合模式的检索结果共两条,分别从时刻2019-10-18 12:30:00.000和2019-10-18 12:30:10.000开始。
+
+![image](https://user-images.githubusercontent.com/5548915/109539955-db07c980-7afc-11eb-8848-41983d393f45.png)
+
+删除索引操作与全序列检索场景类似,不再赘述。
+
+#### 用户功能:面向索引实现者
+
+索引机制致力于屏蔽数据库系统复杂的运行逻辑和消息处理,为开发者提供简洁而友好的索引集成平台。
+
+如果想为IoTDB添加一种新的索引技术,索引实现者需要继承`IoTDBIndex` 类,并在`IndexType` 这一枚举类中添加新的索引类型(“以硬编码的方式向枚举类中添加索引”仍然不够优雅,我们会在未来的版本中用其他方式取代)。
+
+##### IoTDBIndex
+
+`IoTDBIndex`是一个抽象类,一些关键的属性和抽象方法如下:
+
+```java
+// 本索引的特征处理器。所有特征处理器都继承自IndexFeatureExtractor类
+protected IndexFeatureExtractor indexFeatureExtractor;
+
+/**
+  为本索引使用的特征提取器进行初始化操作
+  previous:如果特征处理器在上一次系统关闭时存储了一些状态信息,这些状态信息会以previous参数传入
+  inQueryMode:用于索引写入还是索引查询。索引可以在两种场景中使用不同的特征提取器。
+  */
+public abstract void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode);
+
+// 从特征提取器中获取最新产生的序列和特征信息,写入索引。
+public abstract boolean buildNext() throws IndexManagerException;
+
+// 系统关闭时,将内存中的索引数据序列化到磁盘
+protected abstract void flushIndex();
+
+// 执行索引查询。
+public abstract QueryDataSet query(
+      Map<String, Object> queryProps,
+      IIndexUsable iIndexUsable,
+      QueryContext context,
+      IIndexRefinePhaseOptimize refinePhaseOptimizer,
+      boolean alignedByTime)
+      throws QueryIndexException;
+
+```
+
+##### IndexType
+
+每种索引都有自己的唯一类型`IndexType`。当索引实现者希望添加一种索引时,需要在`IndexType`中增加一种类型,并为以下函数添加一个新分支:
+
+```java
+public static IndexType deserialize(short i);
+
+public short serialize();
+
+private static IoTDBIndex newIndexByType(PartialPath path, TSDataType tsDataType, String indexDir, IndexType indexType, IndexInfo indexInfo);
+```
+
+##### IndexFeatureExtractor
+
+> 在索引框架的第二次PR中暂不涉及IndexFeatureExtractor的具体实现。
+
+在全序列特征提取器中,特征提取器接一条原始数据作为输入,从中提取出三种信息,以应对索引生成和查询过程中存在的各种需求。三种信息从简单到复杂分别为:
+
+1. L1-序列指针:包含三元组信息:序列路径、起始时间戳和结束时间戳。
+2. L2-规整序列:将不定频、有缺失数据的序列转换为维度固定或时间间隔整齐的序列,供索引处理。
+3. L3-序列特征:基于规整序列进一步计算得到的特征。格式不固定。
+
+以上三种信息分别在索引读写过程中起到不同的作用。序列指针是指向原始数据的指针。
+由于IoTDB会在形成TsFile文件时对原始数据进行编码和压缩,因此无法简单地通过"文件路径+偏移量"来获取原始数据。取而代之的是,根据序列名和起止时间这三元组信息来唯一确定一段原始数据。序列指针会在特征文件和候选集中被使用;
+规整序列是多数索引可以接受的输入形式。在实验条件下,索引结构可以认为时序数据维度固定、时间戳等间隔且没有缺失值。但在实际情况中,序列可能存在微小的时间戳偏移或少量的数据错漏,规整序列对原始序列进行了预处理,去除了上述数据缺陷。
+序列特征是索引对原始序列进行特征提取后的结果。特征提取方法往往是索引技术的关键创新所在。
+
+如果索引实现者希望自定义实现特征提取器 `IndexFeatureExtractor` ,需要实现以下接口:
+
+```java
+// 加入一段原始数据以备处理
+public abstract void appendNewSrcData(TVList newData);
+
+// 加入一段原始数据以备处理,由于代码历史原因,IoTDB在数据写入时生成TVList,而查询时却会得到BatchData
+public abstract void appendNewSrcData(BatchData newData);
+
+// 索引框架会首先询问是否有下一条数据
+public abstract boolean hasNext();
+
+// 如果有,调用本方法产生一个待插入数据。注意,在appendNewSrcData中插入一段原始数据后,可能产生多条待插入数据
+public abstract void processNext();
+
+// 处理了一批数据后,调用本方法来释放已经处理过的数据的资源
+public abstract void clearProcessedSrcData();
+
+// 关闭特征处理器,返回状态数据,用于下一次打开时恢复状态
+public abstract ByteBuffer closeAndRelease() throws IOException;
+
+// 返回 L2-规整数据
+public Object getCurrent_L2_AlignedSequence();
+
+// 返回 L3-特征
+public Object getCurrent_L3_Feature();
+```
+
+
+
+#### 系统参数配置
+
+系统参数如下:
+
+| 参数名                                | 类型   | 默认值      | 说明                                                 |
+| ------------------------------------- | ------ | ----------- | ---------------------------------------------------- |
+| enable_index                          | bool   | false       | 是否开启索引                                         |
+| index_root_dir                        | string | data/index  | 所有索引文件的总目录                                 |
+| concurrent_index_build_thread         | int    | 系统CPU核数 | 执行索引写入时的最大并行线程数量                     |
+| default_index_window_range            | int    | 10          | 目前未用到,拟删除                                   |
+| index_buffer_size                     | long   | 134217728   | 目前未用到,拟删除                                   |
+| max_index_query_result_size           | int    | 5           | 索引检索结果的最大返回数量                           |
+| default_max_size_of_unusable_segments | int    | 20          | 可用区间管理器的最大段数,见`SubMatchIndexUsability` |
+
+### 模块设计
+
+下图展示了IoTDB索引机制的模块设计图。索引机制会与IoTDB中的多个模块(绿色框图)产生消息和数据交互(绿色箭头)。例如,在IoTDB系统启动和关闭时会启动或关闭索引机制;当某条序列有新的数据点写入时,IoTDB存储管理器会调用索引机制采取相应操作;当用户发起索引查询时,IoTDB查询处理器会将查询条件转交给索引机制,获取索引查询结果并返回给用户。
+
+![image-20210227025206792](https://user-images.githubusercontent.com/5548915/109539696-92501080-7afc-11eb-8621-a7d4cabcd94c.png)
+
+索引机制内部的各个模块(蓝色框图)也会产生消息和数据交互。索引实现者可以继承指定接口为索引机制添加新的索引实例。通过这些接口,索引机制会调用索引实例完成索引写入和查询等功能(蓝色箭头)。
+
+下面分别介绍各个模块的功能和接口。
+
+#### 特征提取器
+
+对应代码类:`IndexFeatureExtractor` 及其子类。
+
+特征提取器负责对IoTDB的原始序列进行预处理和特征提取。在索引写入阶段,对于全序列检索,特征提取器对整条序列进行特征提取;对于子序列检索,特征提取器首先利用滑动窗口模型将长序列截取为短片段,然后提取特征。特征方法和相关参数在创建索引的时候指定;在索引查询阶段,特征提取器为查询序列提取特征。由于索引技术细节的不同,查询阶段的特征方法可能与写入阶段的特征方法相同或不同。
+系统已经内置了几种较为常见的特征提取器。索引实现者可以直接选用内置的特征提取器,或者实现自定义的新特征提取器。特征提取器的接口详见2.2.2.3节:IndexFeatureExtractor。
+
+#### 索引调度器
+
+首先介绍“索引序列”概念:"索引序列"是一个索引实例覆盖的"序列"或"序列集合"。将索引序列或索引序列集合的所有数据会写入同一个索引实例中。例如,在全序列索引的例子中,`root.Ery.*.Glu ` 就是这个索引的“索引序列”,而在子序列索引的例子中,`root.Wind.AZQ02.Speed` 就是ELB索引的“索引序列”。
+
+每个索引在创建的时候会指定它的“索引序列”,索引序列可能包含一条或多条时间序列。两个“索引序列”之间可能有交集。一个索引序列上可能会创建多种索引。
+
+索引、索引序列和时间序列的ER图是:
+
+![image](https://user-images.githubusercontent.com/5548915/109540557-9a5c8000-7afd-11eb-9be3-ede027e81bb2.png)
+
+每个索引序列上可能会创建多种索引实例,索引框架中的每个索引序列对应一个“实力调度器”(IndexProcessor),负责调度该索引序列下的所有索引实例。
+
+总调度器(IndexManager)、实例调度器(IndexProcessor)和索引实例(Index)的ER图是:
+
+![image](https://user-images.githubusercontent.com/5548915/109540566-9c264380-7afd-11eb-92ef-c9bcc019f4f0.png)
+
+
+
+**总调度器**:对应代码类:`IndexManager` 
+
+IndexManager是索引机制的全局管理者,IoTDB接到的索引创建、删除、查询以及写入均会调用IndexManager。IndexManager会将索引操作分发给相应的IndexProcessor执行。
+
+最初的想法是,"剪枝阶段"由索引实例得到候选集,而"精化阶段"完全由索引框架完成(即对候选集所对应的原始数据进行遍历)。这样的方案是足够通用的。然而,索引技术有各种优化,强制执行上述策略会影响索引的集成。例如,"剪枝阶段"也可能要访问原始数据(RTree),精化阶段也有特定的优化(ELB-Index)。由于已经实现的这两种索引技术均有各自的优化,因此,当前的查询接口将整个查询过程全部交给索引实例完成。
+
+接口如下:
+
+```java
+// 创建索引,目前indexSeriesList中仅包含单条路径。
+public void createIndex(List<PartialPath> indexSeriesList, IndexInfo indexInfo);
+
+// 删除索引
+public void dropIndex(List<PartialPath> indexSeriesList, IndexType indexType);
+
+// 索引查询。
+public QueryDataSet queryIndex(List<PartialPath> paths, IndexType indexType,
+      Map<String, Object> queryProps, QueryContext context, boolean alignedByTime)
+
+// 关闭操作
+private synchronized void close();
+
+```
+
+
+
+**实例调度器**:对应代码类:` IndexProcessor`
+
+每个IndexProcessor对应一个索引序列,负责管理一个索引序列下的所有索引实例操作。
+
+属性如下:
+
+```java
+/**
+每个索引实例在其关闭时会生成一段状态信息(previousData),用于下一次加载时恢复到当前状态。每个 IndexProcessor 的所有 previousData 会被存储于同一个文件。索引可以管理和存储自身的信息,因此 previousData 不是必须的。但由于特征处理器也会产生 previousData,如果索引不想管理这部分信息,则可以将其抛给 IndexProcessor 管理
+*/
+private final Map<IndexType, ByteBuffer> previousDataBufferMap;
+
+// 每个索引实例对应一个可用区间管理器(IIndexUsable),每个IndexProcessor的所有IIndexUsable会被存储于同一个文件
+private Map<IndexType, IIndexUsable> usableMap;
+
+// 精化阶段(精化阶段)的优化器。目前并未用到。未来设计详见{@link IIndexCandidateOrderOptimize}.
+private final IIndexCandidateOrderOptimize refinePhaseOptimizer;
+```
+
+接口如下:
+
+```java
+// 将已排序的序列写入所有索引中
+public void buildIndexForOneSeries(PartialPath path, TVList tvList);
+
+// 需要等待索引全部刷写完才会返回
+public void endFlushMemTable();
+
+// 将除了NoIndex之外的索引实例刷出到磁盘
+public synchronized void close(boolean deleteFiles) throws IOException;
+
+/**
+根据传入的 indexInfoMap 刷新当前IndexProcessor中的索引实例。当用户进行创建或删除索引,则IndexProcessor中维护的索引实例就过期了,因此需要刷新。
+*/
+public void refreshSeriesIndexMapFromMManager(Map<IndexType, IndexInfo> indexInfoMap);
+
+// 对于乱序数据,目前的设计中直接将数据标记为"索引不可用"
+void updateUnsequenceData(PartialPath path, TVList tvList);
+```
+
+
+
+**索引刷写任务器**,对应`IndexMemTableFlushTask`。
+
+在当前的设计中,索引不会每次有新数据点到来就实时更新,而是在IoTDB的flush时批量写入。当一个存储组执行flush任务时会创建一个flush线程`MemTableFlushTask`。如果系统索引功能开启(enableIndex==true), `MemTableFlushTask` 会将MemTable中的时间序列交给IndexManager写入。考虑到系统代码解耦,在flush过程中的所有索引相关操作封装在一个``IndexMemTableFlushTask`` 中。
+
+不直接写入IndexManager,而是要向IndexManager申请一个``IndexMemTableFlushTask``对象的原因是,IoTDB的存储组的flush任务是独立而并行的,但IndexManager是全局单例的。同时,IoTDB的不同存储组所创建的索引也可能互相不覆盖(即对应不同的IndexProcessor,互相可能并没有并行冲突),因此,每个存储组在flush的时候,向IndexManager申请独立的`IndexMemTableFlushTask` 可以提升并行效率。
+
+接口如下:
+
+```java
+// 插入排序后的序列
+public void buildIndexForOneSeries(PartialPath path, TVList tvList);
+
+// 等待所有IndexProcessor写入完成后再返回
+public void endFlush();
+```
+
+
+
+### 特征文件管理器、索引文件管理器
+
+特征文件管理器指定特征文件的存放位置,索引文件管理器指定索引内存结构刷写到磁盘的文件位置。这两个单元的功能都蕴含在索引调度器中。
+
+### 注册表
+
+对应`IIndexRouter` 及其子类。
+
+`IIndexRouter`的第一个功能是管理索引实例的元数据。当新的索引被创建或删除后,注册表会同步更新。
+
+`IIndexRouter`的第二个功能是将索引操作命令高效地传递到相应的`IndexProcessor`。考虑到`IoTDBIndex`、`IndexProcessor`的映射关系较为复杂且可能在未来的设计中有所改动,`IIndexRouter` 可以将这些映射工作与`IndexManager` 解锁。
+
+接口如下:
+
+```java
+// 添加索引
+boolean addIndexIntoRouter(PartialPath prefixPath, IndexInfo indexInfo, CreateIndexProcessorFunc func, boolean doSerialize) throws MetadataException;
+
+// 删除索引
+boolean removeIndexFromRouter(PartialPath prefixPath, IndexType indexType);
+
+// 返回给定 indexSeries 下面的索引信息
+Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries);
+
+// 获取所有 IndexProcessor 及其信息
+Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo();
+
+// 返回给定 timeSeries 所属的所有 IndexProcessors
+Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries);
+
+// 序列化
+void serialize(boolean doClose);
+
+// 反序列化
+void deserializeAndReload(CreateIndexProcessorFunc func);
+
+// 返回一个存储组相关的 IIndexRouter 子集。这是为了提高 IIndexRouter 访问的并行性
+IIndexRouter getRouterByStorageGroup(String storageGroupPath);
+
+// 开始查询
+IndexProcessorStruct startQueryAndCheck(
+      PartialPath partialPath, IndexType indexType, QueryContext context)
+      throws QueryIndexException;
+
+// 结束查询
+void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context);
+```
+
+**ProtoIndexRouter** :是 `IIndexRouter` 的一种简单实现。
+
+子序列索引针对单一序列创建,而全序列索引针对有通配符的一组序列创建,因此 `ProtoIndexRouter` 对两种场景用不同的Map结构管理。
+
+```java
+// 子序列索引,索引序列为全路径
+private Map<String, IndexProcessorStruct> fullPathProcessorMap;
+
+// 全序列索引,索引序列包含通配符
+private Map<PartialPath, IndexProcessorStruct>` wildCardProcessorMap
+```
+
+`IIndexRouter` 的关键功能是为一个序列快速找到其所属的`IndexProcessor`。如果该序列是全路径的,则可以在`fullPathProcessorMap` 中以 $O(1)$ 找到;否则,必须遍历 `wildCardProcessorMap` 中的每一个包含通配符的路径。
+
+
+
+### 可用区间管理器
+
+可用区间管理器用于处理乱序数据。
+大多数时间序列数据点会按照数据时间戳顺序到来,但当乱序数据到来,或者用户手动更新了某段数据后,这一段数据上的索引正确性无法保证。
+对于这些数据,可用区间管理器将其标记为索引不可用。不可用区间的序列将被加入候选集中,接受"精化阶段"的检验。
+
+接口如下:
+
+```java
+// 增加可用区间
+void addUsableRange(PartialPath fullPath, long start, long end);
+
+// 增加不可用区间
+void minusUsableRange(PartialPath fullPath, long start, long end);
+
+// 获取不可用区间,注意,全序列索引和子序列索引的返回格式不同
+Object getUnusableRange();
+
+// 序列化
+void serialize(OutputStream outputStream) throws IOException;
+
+// 反序列化
+void deserialize(InputStream inputStream) throws IllegalPathException, IOException;
+```
+
+当前版本针对全序列索引和子序列索引,分别实现了一种可用区间管理器。
+
+**SubMatchIndexUsability**:针对子序列索引的可用区间管理器。
+
+子序列索引的“索引序列”为单一序列,不包含通配符。如果这条长序列上的某一子片段被更改,则将这一片段标记为“索引不可用”,剩余片段则仍为“索引可用”区间。
+
+当前的实现采用一个链表来标记索引不可用的片段。每个链表的节点为 `RangeNode` ,代表一段不可用的时间段。
+
+```java
+class RangeNode {
+    long start;
+    long end;
+    RangeNode next;
+}
+```
+
+`SubMatchIndexUsability` 的构造函数有两个:
+
+```java
+/**
+ * 构造函数
+ * @param maxSizeOfUsableSegments 不可用区间的最大段数
+ * @param initAllUsable 如果为true,最初时间段均为“索引可用”,如果为false,最初均为“索引不可用”
+ */
+SubMatchIndexUsability(int maxSizeOfUsableSegments, boolean initAllUsable) {
+  ...
+}
+```
+
+当前实现中对于 `RangeNode` 链表的访问和更新是线性遍历的,然而用户可以指定不可用区间的最大段数 $M$ ,因此链表的访问和更新复杂度可以控制在 `O(M)` ,即常数量级。当不可用区间的段数增加到上限时,会将新的不可用区间与较近的区间合并,从而控制区间段数。这样会使得“不可用区间”范围扩大,但将“可用区间”标记为“不可用区间”仅会让一些“被剪枝”的序列进入精化阶段,而不会造成漏解(即,将本来正确的结果错误地排除掉),因此 $M$ 参数并不会影响索引查询的正确性。
+
+`getUnusableRange()` 函数返回一系列不可用区间的时间过滤器:`List<Filter>` 。
+
+例如,在序列 `root.Wind.AZQ02.Speed` 上创建子序列索引。
+
+1.  `initAllUsable = false,最初 整个区间段均为“索引不可用”;`maxSizeOfUsableSegments=2` 最多允许两个索引不可用片段存在;
+2. 增加可用片段 $[5,7]$ ,不可用区间变为  $[MIN,4] \cup [8,MAX]$ ;
+3. 增加可用片段 $[2,3]$ ,不可用区间应当分裂为  $[1,2] \cup [4,4]\cup [8,10]$ ,然而不可用区间超过了上限,因此停止分裂,仍然保持  $[MIN,4] \cup [8,MAX]$ 
+
+**WholeMatchIndexUsability**:针对全序列索引的可用区间管理器。
+
+全序列索引的“索引序列”包含通配符,每一条符合规则的时间序列作为一个整体插入索引。因此,对于一条序列的任何更改都会使整条序列变为“索引不可用”。因此,本类采用一个集合 `unusableSeriesSet` 来标记索引不可用的序列。
+
+例如,在索引序列 `root.Ery.*.Glu ` 之上创建全序列索引。当某条序列 `root.Ery.01.Glu` 被更改,则将其加入集合 `unusableSeriesSet` ,该条序列被标记为“索引不可用”。
+
+`getUnusableRange()` 函数返回被标记为“索引不可用”的序列的集合 `Set<PartialPath>` 。
+
+### 索引查询处理器
+
+查询处理模块负责执行索引相关的查询。包括索引查询执行器、查询优化器,在查询时也会用到可用区间管理器。
+
+**查询执行器**:对应类`QueryIndexExecutor` 。
+
+尽管我们可以从相似性索引的查询过程中提取共性,将其分为“剪枝阶段”和“精化阶段”,并且精化阶段可以由索引框架结果。然而,许多索引技术(包括当前实现的两种索引:RTree和ELB索引)会对“剪枝阶段”和“精化阶段”结合起来优化,强制接管“精化阶段”可能会影响索引查询效率。因此,现阶段的查询处理器直接调用`IndexManager#queryIndex` 函数。
+
+**查询结果类**:对应类 `IndexQueryDataSet` 继承自 `ListDataSet`。
+
+
+
+### 索引接口
+
+对应`IoTDBIndex` 。
+
+索引接口是索引实现者唯一需要实现的部分,接口包括指定特征提取器、写入索引、序列化索引、索引查询等等,详细阐述参见2.2.2.1节。
+
+## 流程设计
+
+### 索引写入流程
+
+![image](https://user-images.githubusercontent.com/5548915/109540629-b6602180-7afd-11eb-8dbe-e38c91921a7e.png)
+
+
+
+上图展现了索引机制的写入流程:
+
+* 当用户向数据库系统写入新的数据后,数据库系统通知索引总调度器 `IndexManger`。
+* 总调度器首先向注册表进行校验,判断数据所在的序列是否创建过索引。
+* 当校验成功后,总调度器将数据传递到相应的索引实例调度器,图中省略这一步骤。
+* 调度器首先使用特征提取器将新数据转换为特征,然后由索引实例消费掉。
+* 当数据写入索引实例后,更新可用区间管理器。至此,完成整个写入流程。
+
+在当前的实现中,仅当触发IoTDB的刷写操作时(`MemTableFlushTask`),索引机制才触发索引的批量写入操作。这样带来两个好处:首先,在批量写入的情况下,
+传感器将多个数据点积攒成序列,方便了特征提取器的处理;其次,考虑到工业物联网数据的乱序情况,IoTDB会将一段时间内的数据进行排序再传给索引机制,这样减少了乱序数据给索引带来的负面影响。
+
+延迟写入索引并不会影响索引查询的正确性。对于已经写入IoTDB但尚未触发刷写操作的数据,索引机制中的可用区间管理器会将其标记为"索引不可用",因此,这部分数据会直接进入精化阶段进行计算,不会被遗漏。
+
+### 索引查询流程
+
+![image](https://user-images.githubusercontent.com/5548915/109540634-b95b1200-7afd-11eb-800a-71b3b11ce16d.png)
+
+当用户发起索引查询时,索引机制为其创建单独的线程并执行查询。相似性索引的一般查询流程如下:
+
+1. 用户发起索引查询时,IoTDB会为其创建一个单独的线程并执行整个查询流程。这一执行线程调用索引机制并传入查询条件。索引机制首先调用注册表,判断该序列上是否创建过索引、索引是否支持这一查询条件。
+2. 校验成功后,`IndexManager` 根据查询条件中指定的查询目标,将执行线程和查询条件传递到相应的索引实例调度器`IndexProcessor` 。
+3. 执行线程调用特征提取器 `FeatureExtractor`,对查询序列提取特征。然后将查询特征及其他查询条件输入索引实例 `IoTDBIndex` 。
+   1. 执行过滤阶段,并返回候选集,候选集中包含了一系列指向原始数据的指针。至此,查询处理模块的**过滤单元**完成。
+   2. 在进入精化阶段前,执行线程首先调用可用区间管理器,将索引不可用区间中的序列加入候选集中,避免乱序数据造成索引错误剪枝。
+   3. 然后调用查询优化器,对候选集的访问顺序进行重排序。
+   4. 接下来,执行线程访问IoTDB,读取候选集所指向的原始数据并完成精确计算。
+   5. 至此,查询处理模块的精化单元完成,返回结果。
+
+上述流程的第三步中,索引框架接管了精化阶段。但如果索引实例有更多的优化,也可以完全接管查询过程并直接返回结果。反映到模块调用上,即 `IndexProcessor` 直接调用 `IoTDBIndex#query` 函数,返回 `QueryDataSet` 。
+
+### 索引更新与删除
+
+索引更新主要涉及两个方面。首先,当IoTDB接收乱序数据后,索引可用区间管理器会相应更新,从而保证后续查询的正确性。其次,当IoTDB完成乱序数据整理和合并后,索引框架将对索引发起重建,这一步在未来实现。
+
+当索引被删除后,索引目录下的文件将被删除。
+
+
+
+## 文件结构
+
+![image](https://user-images.githubusercontent.com/5548915/109540601-a9433280-7afd-11eb-8bdb-3f77181c1637.png)
+
+上图左图是索引框架的文件组织结构。
+
+* 总文件夹index下包括数据文件夹和元数据文件夹;
+  * 元数据文件夹中目前仅包括注册表文件夹;
+  * 数据文件夹下,每个索引序列(对应一个 `IndexProcessor` )都对应一个文件夹;
+    * 索引序列文件夹会创建一个状态文件和可用区间管理器文件;
+    * 每个索引序列文件夹下可能创建了多个索引实例;
+
+上图右图是一个例子。
+
+
+
+## 并行性分析(TODO)
+
+## 复杂度分析(TODO)
+
+量化,时间复杂度,内存/磁盘空间复杂度。
+
+
+
+## 问题讨论
+
+### 索引读写并发能力-锁粒度
+
+为每一个索引实例分配一个读写锁,并由IndexProcessor控制索引的读写。
+
+由于原始数据是分批刷写磁盘的,因此索引也需要支持批量更新。这意味着索引不能在创建索引之前就看到全部原始数据。一些需要基于全局数据确定参数的索引的性能可能会受到影响(如VaFile和基于Kmeans的倒排索引)。
+
+目前为每个索引维护了读写锁(`org.apache.iotdb.db.index.IndexProcessor#indexLockMap`)。允许对索引的并发查询,但索引的写入和查询是互斥的。这一设计有两点值得讨论:
+
+1. 查询并发:并发查询提升了查询效率,但要求索引是查询并发安全的;
+2. 读写互斥:这意味着IoTDB刷写操作和索引查询之间会互相阻塞,无疑降低了效率。然而要取消这一限制,则要求索引是读写并发安全的。
+
+根据开发者对一些开源索引代码的调研,索引通常可以满足查询并发,但较难满足读写互斥,因此设计了读写锁机制。但是,值得讨论的是,是否对集成到IoTDB的索引代码提出更高的要求,从而提升IoTDB的效率呢?
+
+### 索引更新删除能力-可用区间
+
+TODO
+
+
+
+# 未来规划
+
+预计在第三次PR中提交的模块:
+
+* 特征提取器的实现 `IndexFeatureExtractor` :包括全序列检索和子序列检索中一些常见的特征提取方法,例如全序列检索用到的PAA特征、子序列检索中用到的两种滑动窗口模型等等;
+* 索引算法:基于以上特征提取器,实现了两种基本的检索算法:基于PAA特征的R树索引和等长特征块索引(ELB,回答子序列检索)
+
+留待未来实现的模块如下:
+
+* 统计模块IndexStats:几乎所有相似性索引(甚至可以推广到所有索引)都会关注一些共通的统计量,例如:CPU时间、磁盘访问时间、内存占用、磁盘占用、查询剪枝率、近似查询中的查准率和查全率等等。统计模块为其提供全面而标准的工具函数,用于索引性能评估和不同索引之间的性能比较;
+* 候选集顺序优化器 `IIndexCandidateOrderOptimize`:相似性索引剪枝后得到候选集列表。由于索引并未考虑IoTDB的数据组织方式,因此候选集的顺序可能不是最优的。候选集顺序优化器对此进行优化;
+* 其他索引技术
+
+
+
+# 测试验证(TODO)
+
+## 测试目标
+
+正确性测试或性能(对比)测试
+
+##  测试方案
+
+\- 测试系统
+
+\- 被测系统
+
+\- 测试环境
+
+\- 负载描述
+
+## 测试结果
+
+## 测试结论
\ No newline at end of file
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index eb527ae..476d547 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -657,6 +657,12 @@ default_index_window_range=10
 # buffer parameter for index processor.
 index_buffer_size=134217728
 
+# the max returned size of index query result
+max_index_query_result_size=5
+
+# the default value of max size of the index unusable segments, used in SubMatchIndexUsability
+default_max_size_of_unusable_segments=20
+
 # whether enable data partition. If disabled, all data belongs to partition 0
 enable_partition=false
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 030fde2..4e44ef8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
 import org.apache.iotdb.db.exception.LoadConfigurationException;
+import org.apache.iotdb.db.index.IndexProcessor;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.service.TSServiceImpl;
 import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -247,11 +248,11 @@ public class IoTDBConfig {
   /**
    * If we enable the memory-control mechanism during index building , {@code indexBufferSize}
    * refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
-   * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
-   * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
-   * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
-   * will be flushed to the disk file. As a result, data in one series may be divided into more than
-   * one part and indexed separately.
+   * one {@linkplain IndexProcessor IndexProcessor} share a total common buffer size. With the
+   * memory-control mechanism, the occupied memory of all raw data and index structures will be
+   * counted. If the memory buffer size reaches this threshold, the indexes will be flushed to the
+   * disk file. As a result, data in one series may be divided into more than one part and indexed
+   * separately.
    */
   private long indexBufferSize = 128 * 1024 * 1024L;
 
@@ -261,6 +262,10 @@ public class IoTDBConfig {
    */
   private int defaultIndexWindowRange = 10;
 
+  /**
+   * the default value of max size of the index unusable segments, used in SubMatchIndexUsability
+   */
+  private int defaultMaxSizeOfUnusableSegments = 20;
   /** index directory. */
   private String indexRootFolder = "data" + File.separator + "index";
 
@@ -637,6 +642,8 @@ public class IoTDBConfig {
   /** the number of virtual storage groups per user-defined storage group */
   private int virtualStorageGroupNum = 1;
 
+  private int maxIndexQueryResultSize = 5;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -2035,6 +2042,22 @@ public class IoTDBConfig {
     this.defaultIndexWindowRange = defaultIndexWindowRange;
   }
 
+  public int getMaxIndexQueryResultSize() {
+    return this.maxIndexQueryResultSize;
+  }
+
+  public void setMaxIndexQueryResultSize(int maxIndexQueryResultSize) {
+    this.maxIndexQueryResultSize = maxIndexQueryResultSize;
+  }
+
+  public int getDefaultMaxSizeOfUnusableSegments() {
+    return defaultMaxSizeOfUnusableSegments;
+  }
+
+  public void setDefaultMaxSizeOfUnusableSegments(int defaultSizeOfUsableSegments) {
+    this.defaultMaxSizeOfUnusableSegments = defaultSizeOfUsableSegments;
+  }
+
   public int getVirtualStorageGroupNum() {
     return virtualStorageGroupNum;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ea507f9..f1a86b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -436,6 +436,18 @@ public class IoTDBDescriptor {
         conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
       }
 
+      conf.setMaxIndexQueryResultSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "max_index_query_result_size",
+                  Integer.toString(conf.getMaxIndexQueryResultSize()))));
+
+      conf.setDefaultMaxSizeOfUnusableSegments(
+          Integer.parseInt(
+              properties.getProperty(
+                  "default_max_size_of_unusable_segments",
+                  Integer.toString(conf.getDefaultMaxSizeOfUnusableSegments()))));
+
       conf.setmManagerCacheSize(
           Integer.parseInt(
               properties
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..21b0e3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.IndexMemTableFlushTask;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -58,7 +61,9 @@ public class MemTableFlushTask {
           ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
           : new LinkedBlockingQueue<>();
 
+  private final boolean enabledIndex;
   private String storageGroup;
+  private final boolean sequence;
 
   private IMemTable memTable;
 
@@ -71,7 +76,7 @@ public class MemTableFlushTask {
    * @param storageGroup current storage group
    */
   public MemTableFlushTask(
-      IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
+      IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup, boolean sequence) {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
@@ -81,6 +86,8 @@ public class MemTableFlushTask {
         "flush task of Storage group {} memtable is created, flushing to file {}.",
         storageGroup,
         writer.getFile().getName());
+    this.enabledIndex = IoTDBDescriptor.getInstance().getConfig().isEnableIndex();
+    this.sequence = sequence;
   }
 
   /** the function for flushing memtable. */
@@ -100,6 +107,14 @@ public class MemTableFlushTask {
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
+    IndexMemTableFlushTask indexFlushTask = null;
+    if (enabledIndex) {
+      try {
+        indexFlushTask = IndexManager.getInstance().getIndexMemFlushTask(storageGroup, sequence);
+      } catch (Exception e) {
+        LOGGER.error("meet Exception in getIndexMemFlushTask, not affect the memtable flushing", e);
+      }
+    }
 
     // for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
@@ -114,6 +129,16 @@ public class MemTableFlushTask {
         TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.put(new Pair<>(tvList, desc));
+        if (enabledIndex && indexFlushTask != null) {
+          try {
+            String deviceId = memTableEntry.getKey();
+            String measurementId = iWritableMemChunkEntry.getKey();
+            indexFlushTask.buildIndexForOneSeries(new PartialPath(deviceId, measurementId), tvList);
+          } catch (Exception e) {
+            LOGGER.error(
+                "meet Exception in buildIndexForOneSeries, not affect the memtable flushing", e);
+          }
+        }
       }
 
       encodingTaskQueue.put(new EndChunkGroupIoTask());
@@ -133,6 +158,13 @@ public class MemTableFlushTask {
     }
 
     ioTaskFuture.get();
+    if (enabledIndex && indexFlushTask != null) {
+      try {
+        indexFlushTask.endFlush();
+      } catch (Exception e) {
+        LOGGER.error("meet Exception in endFlush, not affect the memtable flushing", e);
+      }
+    }
 
     try {
       writer.writePlanIndices();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..6a73393 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -775,7 +775,7 @@ public class TsFileProcessor {
       try {
         writer.mark();
         MemTableFlushTask flushTask =
-            new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
+            new MemTableFlushTask(memTableToFlush, writer, storageGroupName, sequence);
         flushTask.syncFlushMemTable();
       } catch (Exception e) {
         if (writer == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
index 7b8b8cf..89bc2fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
@@ -18,6 +18,7 @@
 package org.apache.iotdb.db.exception.index;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 public class QueryIndexException extends QueryProcessException {
 
@@ -26,4 +27,8 @@ public class QueryIndexException extends QueryProcessException {
   public QueryIndexException(String message, int errorCode) {
     super(message, errorCode);
   }
+
+  public QueryIndexException(String message) {
+    super(message, TSStatusCode.INDEX_QUERY_ERROR.getStatusCode());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
index 7b8b8cf..78ba38f 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/UnsupportedIndexFuncException.java
@@ -17,13 +17,13 @@
  */
 package org.apache.iotdb.db.exception.index;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
-public class QueryIndexException extends QueryProcessException {
+public class UnsupportedIndexFuncException extends QueryIndexException {
 
-  private static final long serialVersionUID = 9019233783504576296L;
+  private static final long serialVersionUID = 4185676677334759039L;
 
-  public QueryIndexException(String message, int errorCode) {
-    super(message, errorCode);
+  public UnsupportedIndexFuncException(String message) {
+    super(message, TSStatusCode.UNSUPPORTED_INDEX_FUNC_ERROR.getStatusCode());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java
new file mode 100644
index 0000000..1104c99
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexBuildTaskPoolManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A pool to control the max size of concurrent threads for index insertion at the same time. */
+public class IndexBuildTaskPoolManager extends AbstractPoolManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildTaskPoolManager.class);
+
+  private IndexBuildTaskPoolManager() {
+    int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentIndexBuildThread();
+    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.INDEX_SERVICE.getName());
+  }
+
+  public static IndexBuildTaskPoolManager getInstance() {
+    return IndexBuildTaskPoolManager.InstanceHolder.instance;
+  }
+
+  @Override
+  public Logger getLogger() {
+    return LOGGER;
+  }
+
+  @Override
+  public String getName() {
+    return "index building task";
+  }
+
+  @Override
+  public void start() {
+    if (pool == null) {
+      int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentIndexBuildThread();
+      pool =
+          IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.INDEX_SERVICE.getName());
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (pool != null) {
+      close();
+      pool = null;
+    }
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+      // allowed to do nothing
+    }
+
+    private static IndexBuildTaskPoolManager instance = new IndexBuildTaskPoolManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
new file mode 100644
index 0000000..053596b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_DATA_DIR_NAME;
+import static org.apache.iotdb.db.index.common.IndexConstant.META_DIR_NAME;
+import static org.apache.iotdb.db.index.common.IndexConstant.ROUTER_DIR;
+
+/**
+ * IndexManager is the global manager of index framework, which will be called by IoTDB when index
+ * creation, deletion, query and insertion. IndexManager will pass the index operations to the
+ * corresponding IndexProcessor.
+ */
+public class IndexManager implements IndexManagerMBean, IService {
+
+  private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+  /**
+   * The index root directory. All index metadata files, index data files are stored in this
+   * directory.
+   */
+  private final String indexRootDirPath;
+
+  private final String indexMetaDirPath;
+  private final String indexRouterDir;
+  private final String indexDataDirPath;
+  private final IIndexRouter router;
+
+  /** A function interface to construct an index processor. */
+  private CreateIndexProcessorFunc createIndexProcessorFunc;
+
+  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private IndexManager() {
+    indexRootDirPath = DirectoryManager.getInstance().getIndexRootFolder();
+    indexMetaDirPath = indexRootDirPath + File.separator + META_DIR_NAME;
+    indexRouterDir = indexMetaDirPath + File.separator + ROUTER_DIR;
+    indexDataDirPath = indexRootDirPath + File.separator + INDEX_DATA_DIR_NAME;
+    createIndexProcessorFunc =
+        (indexSeries, indexInfoMap) ->
+            new IndexProcessor(
+                indexSeries,
+                IndexUtils.removeIllegalStarInDir(indexDataDirPath + File.separator + indexSeries));
+    router = IIndexRouter.Factory.getIndexRouter(indexRouterDir);
+  }
+
+  /**
+   * Given an IndexSeries, return its feature file path (unused currently).
+   *
+   * @param path the path on which the index is created, e.g. Root.ery.*.Glu or Root.Wind.d1.Speed.
+   * @param indexType the type of index
+   * @return the feature directory path for this index.
+   */
+  private String getFeatureFileDirectory(PartialPath path, IndexType indexType) {
+    return IndexUtils.removeIllegalStarInDir(
+        indexDataDirPath + File.separator + path.getFullPath() + File.separator + indexType);
+  }
+
+  /**
+   * Given an IndexSeries, return its data file path (unused currently).
+   *
+   * @param path the path on which the index is created, e.g. Root.ery.*.Glu or Root.Wind.d1.Speed.
+   * @param indexType the type of index
+   * @return the feature directory path for this index.
+   */
+  private String getIndexDataDirectory(PartialPath path, IndexType indexType) {
+    return getFeatureFileDirectory(path, indexType);
+  }
+
+  /**
+   * Execute the index creation. Due to the complex mapping relationship between the time series and
+   * the index instances, we encapsulate the index metadata management into the router {@link
+   * IIndexRouter} for stability.
+   *
+   * @param indexSeriesList a singleton list up to now.
+   * @param indexInfo the index information.
+   */
+  public void createIndex(List<PartialPath> indexSeriesList, IndexInfo indexInfo)
+      throws MetadataException {
+    if (!indexSeriesList.isEmpty()) {
+      router.addIndexIntoRouter(indexSeriesList.get(0), indexInfo, createIndexProcessorFunc, true);
+    }
+  }
+
+  /**
+   * Execute the index deletion.
+   *
+   * @param indexSeriesList a singleton list up to now.
+   * @param indexType the index type to be dropped.
+   */
+  public void dropIndex(List<PartialPath> indexSeriesList, IndexType indexType)
+      throws MetadataException, IOException {
+    if (!indexSeriesList.isEmpty()) {
+      router.removeIndexFromRouter(indexSeriesList.get(0), indexType);
+    }
+  }
+
+  /**
+   * When the storage group flushes,we construct {@link IndexMemTableFlushTask} for index insertion。
+   *
+   * <p>So far, the index insertion is triggered only when Memtables flush. A storage group contains
+   * several series and each of these series may create several indexes. In other words, one storage
+   * group may correspond to several {@linkplain IndexProcessor}.
+   *
+   * <p>This method return a router to find all {@linkplain IndexProcessor} related to this storage
+   * group.
+   *
+   * @param storageGroupPath the path of the storage group
+   * @param sequence true if it's sequence data, otherwise it's unsequence data
+   * @return a router to find all {@linkplain IndexProcessor} related to this storage group, and
+   *     other informations
+   * @see IndexMemTableFlushTask
+   */
+  public IndexMemTableFlushTask getIndexMemFlushTask(String storageGroupPath, boolean sequence) {
+    // StorageGroupPath may contain file separator, we put a temp patch here.
+    storageGroupPath = storageGroupPath.replace(File.separatorChar, '/');
+    String realStorageGroupPath = storageGroupPath.split("/")[0];
+    IIndexRouter sgRouter = router.getRouterByStorageGroup(realStorageGroupPath);
+    return new IndexMemTableFlushTask(sgRouter, sequence);
+  }
+
+  /**
+   * Index query.
+   *
+   * <p>The initial idea is that index instances only process the "pruning phase" to prune some
+   * negative items and return a candidate list, the framework finishes the rest (so-called
+   * "post-processing phase" or "refinement phase", to query the raw time series by the candidate
+   * list and then to verified which series in candidate list are real positive results).
+   *
+   * <p>The above design is common enough for all of similarity index methods. However, index
+   * technology has various optimizations, and enforcing the above strategy will affect the freedom
+   * of index integration. The two implemented indexes (ELB index and RTree index) have their own
+   * optimizations which combine the pruning phase and post-processing phase. Therefore, in current
+   * version, the query process is entirely passed to the index instance.
+   *
+   * @param paths the series to be queried.
+   * @param indexType the index type to be queried.
+   * @param queryProps the properties of this query.
+   * @param context the query context.
+   * @param alignedByTime whether aligned index result by timestamps.
+   * @return the index query result.
+   */
+  public QueryDataSet queryIndex(
+      List<PartialPath> paths,
+      IndexType indexType,
+      Map<String, Object> queryProps,
+      QueryContext context,
+      boolean alignedByTime)
+      throws QueryIndexException, StorageEngineException {
+    if (paths.size() != 1) {
+      throw new QueryIndexException("Index allows to query only one path");
+    }
+    PartialPath queryIndexSeries = paths.get(0);
+    IndexProcessorStruct indexProcessorStruct =
+        router.startQueryAndCheck(queryIndexSeries, indexType, context);
+    List<StorageGroupProcessor> list = indexProcessorStruct.addMergeLock();
+    try {
+      return indexProcessorStruct.processor.query(indexType, queryProps, context, alignedByTime);
+    } finally {
+      StorageEngine.getInstance().mergeUnLock(list);
+      router.endQuery(indexProcessorStruct.processor.getIndexSeries(), indexType, context);
+    }
+  }
+
+  private void prepareDirectory() {
+    File rootDir = IndexUtils.getIndexFile(indexRootDirPath);
+    if (!rootDir.exists()) {
+      rootDir.mkdirs();
+    }
+    File routerDir = IndexUtils.getIndexFile(indexRouterDir);
+    if (!routerDir.exists()) {
+      routerDir.mkdirs();
+    }
+    File metaDir = IndexUtils.getIndexFile(indexMetaDirPath);
+    if (!metaDir.exists()) {
+      metaDir.mkdirs();
+    }
+    File dataDir = IndexUtils.getIndexFile(indexDataDirPath);
+    if (!dataDir.exists()) {
+      dataDir.mkdirs();
+    }
+  }
+
+  private void deleteDroppedIndexData() throws IOException, IllegalPathException {
+    for (File processorDataDir :
+        Objects.requireNonNull(IndexUtils.getIndexFile(indexDataDirPath).listFiles())) {
+      String processorName = processorDataDir.getName();
+      Map<IndexType, IndexInfo> infos =
+          router.getIndexInfosByIndexSeries(new PartialPath(processorName));
+      if (infos.isEmpty()) {
+        FileUtils.deleteDirectory(processorDataDir);
+      } else {
+        for (File indexDataDir : Objects.requireNonNull(processorDataDir.listFiles())) {
+          if (indexDataDir.isDirectory()
+              && !infos.containsKey(IndexType.valueOf(indexDataDir.getName()))) {
+            FileUtils.deleteDirectory(indexDataDir);
+          }
+        }
+      }
+    }
+  }
+
+  /** close the index manager. */
+  private synchronized void close() {
+    router.serialize(true);
+  }
+
+  @Override
+  public void start() throws StartupException {
+    if (!config.isEnableIndex()) {
+      return;
+    }
+    IndexBuildTaskPoolManager.getInstance().start();
+    try {
+      JMXService.registerMBean(this, ServiceType.INDEX_SERVICE.getJmxName());
+      prepareDirectory();
+      router.deserializeAndReload(createIndexProcessorFunc);
+      deleteDroppedIndexData();
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  /**
+   * As IoTDB has no normal shutdown mechanism, this function will not be called. To ensure the
+   * information safety, The router needs to serialize index metadata every time createIndex or
+   * dropIndex is called.
+   */
+  @Override
+  public void stop() {
+    if (!config.isEnableIndex()) {
+      return;
+    }
+    close();
+    IndexBuildTaskPoolManager.getInstance().stop();
+    JMXService.deregisterMBean(ServiceType.INDEX_SERVICE.getJmxName());
+  }
+
+  public static IndexManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.INDEX_SERVICE;
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static IndexManager instance = new IndexManager();
+  }
+
+  @TestOnly
+  public synchronized void deleteAll() throws IOException {
+    logger.info("Start deleting all storage groups' timeseries");
+    close();
+
+    File indexMetaDir = IndexUtils.getIndexFile(this.indexMetaDirPath);
+    if (indexMetaDir.exists()) {
+      FileUtils.deleteDirectory(indexMetaDir);
+    }
+
+    File indexDataDir = IndexUtils.getIndexFile(this.indexDataDirPath);
+    if (indexDataDir.exists()) {
+      FileUtils.deleteDirectory(indexDataDir);
+    }
+    File indexRootDir =
+        IndexUtils.getIndexFile(DirectoryManager.getInstance().getIndexRootFolder());
+    if (indexRootDir.exists()) {
+      FileUtils.deleteDirectory(indexRootDir);
+    }
+  }
+
+  @TestOnly
+  public IIndexRouter getRouter() {
+    return router;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java
new file mode 100644
index 0000000..95cb0a9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManagerMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+public interface IndexManagerMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java
new file mode 100644
index 0000000..dea0b1a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexMemTableFlushTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+/**
+ * IndexMemTableFlushTask is responsible for the index insertion when a TsFileProcessor flushes.
+ * IoTDB creates a MemTableFlushTask when a memtable is flushed to disk. After MemTableFlushTask
+ * sorts a series, it will be passed to a IndexMemTableFlushTask. The IndexMemTableFlushTask will
+ * detect whether one or more indexes have been created on this series, and pass its data to
+ * corresponding IndexProcessors and insert it into the corresponding indexes.
+ *
+ * <p>IndexMemTableFlushTask may cover more than one index processor.
+ */
+public class IndexMemTableFlushTask {
+
+  private final IIndexRouter router;
+  private final boolean sequence;
+
+  /** it should be immutable. */
+  IndexMemTableFlushTask(IIndexRouter router, boolean sequence) {
+    // check all processors
+    this.router = router;
+    this.sequence = sequence;
+    // in current version, we don't build index for unsequence block
+    if (sequence) {
+      for (IndexProcessorStruct p : router.getAllIndexProcessorsAndInfo()) {
+        p.processor.startFlushMemTable();
+      }
+    }
+  }
+
+  /**
+   * insert sorted time series
+   *
+   * @param path the path of time series
+   * @param tvList the sorted data
+   */
+  public void buildIndexForOneSeries(PartialPath path, TVList tvList) {
+    // in current version, we don't build index for unsequence block, but only update the index
+    // usability range.
+    if (sequence) {
+      router.getIndexProcessorByPath(path).forEach(p -> p.buildIndexForOneSeries(path, tvList));
+    } else {
+      router.getIndexProcessorByPath(path).forEach(p -> p.updateUnsequenceData(path, tvList));
+    }
+  }
+
+  /** wait for all IndexProcessors to finish building indexes. */
+  public void endFlush() {
+    if (sequence) {
+      router.getAllIndexProcessorsAndInfo().forEach(p -> p.processor.endFlushMemTable());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
new file mode 100644
index 0000000..95cf721
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
+import org.apache.iotdb.db.exception.index.IndexRuntimeException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.algorithm.IoTDBIndex;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.common.func.IndexNaiveFunc;
+import org.apache.iotdb.db.index.feature.IndexFeatureExtractor;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.router.IIndexRouter;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Each {@code IndexProcessor} manages all index instances under an <b>IndexSeries</b>.
+ *
+ * <p>An <b>IndexSeries</b> is one series or a set of series that an index instance acts on.
+ *
+ * <ul>
+ *   <li>For whole matching, the IndexSeries is a set of series represented by a path with wildcard
+ *       characters. For example, creating index on IndexSeries "root.steel.*.temperature" means all
+ *       series belong to this IndexSeries will be inserted, such as root.steel.s1.temperature,
+ *       root. steel.s2.temperature, ...
+ *   <li>For subsequence matching, the IndexSeries is a full path. For example, creating index on
+ *       IndexSeries "root.wind.azq01.speed" means all subsequence of this series will be inserted.
+ * </ul>
+ *
+ * In current design。users are allowed to create more than one type of index on an IndexSeries. e.g.
+ * we can create RTree and DS-Tree on "root.steel.*.temperature". New coming data will be inserted
+ * into both of these two indexes.
+ */
+public class IndexProcessor implements Comparable<IndexProcessor> {
+
+  private static final Logger logger = LoggerFactory.getLogger(IndexProcessor.class);
+
+  private PartialPath indexSeries;
+  private final String indexSeriesDirPath;
+  private TSDataType tsDataType;
+  private final IndexBuildTaskPoolManager indexBuildPoolManager;
+  private ReadWriteLock lock = new ReentrantReadWriteLock();
+  private Map<IndexType, ReadWriteLock> indexLockMap;
+
+  /**
+   * How many indexes of this IndexProcessor are currently inserting data. If 0, no indexes are
+   * inserting, that is, this IndexProcessor is not in flushing state.
+   */
+  private AtomicInteger numIndexBuildTasks;
+
+  /** Whether the processor has been closed */
+  private volatile boolean closed;
+
+  /** The map of index instances. */
+  private Map<IndexType, IoTDBIndex> allPathsIndexMap;
+
+  /** Some status data saved when index is closed for next open. */
+  private final Map<IndexType, ByteBuffer> previousDataBufferMap;
+
+  private final String previousDataBufferFile;
+
+  /**
+   * Each index instance corresponds to an {@linkplain IIndexUsable}. All IIndexUsable of a
+   * IndexProcessor are stored in one file.
+   */
+  private Map<IndexType, IIndexUsable> usableMap;
+
+  private final String usableFile;
+
+  /** The optimizer for the post-processing phase (refinement phase). Unused yet. */
+  private final IIndexCandidateOrderOptimize refinePhaseOptimizer;
+
+  public IndexProcessor(PartialPath indexSeries, String indexSeriesDirPath) {
+    this.indexBuildPoolManager = IndexBuildTaskPoolManager.getInstance();
+
+    this.numIndexBuildTasks = new AtomicInteger(0);
+    this.indexSeries = indexSeries;
+    this.indexSeriesDirPath = indexSeriesDirPath;
+    File dir = IndexUtils.getIndexFile(indexSeriesDirPath);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+    this.closed = false;
+    this.allPathsIndexMap = new EnumMap<>(IndexType.class);
+    this.previousDataBufferMap = new EnumMap<>(IndexType.class);
+    this.indexLockMap = new EnumMap<>(IndexType.class);
+    this.usableMap = new EnumMap<>(IndexType.class);
+    this.previousDataBufferFile = indexSeriesDirPath + File.separator + "previousBuffer";
+    this.usableFile = indexSeriesDirPath + File.separator + "usableMap";
+    this.tsDataType = initSeriesType();
+    this.refinePhaseOptimizer = IIndexCandidateOrderOptimize.Factory.getOptimize();
+    deserializePreviousBuffer();
+    deserializeUsable(indexSeries);
+  }
+
+  /**
+   * Determines the data type of the index instances. All time series covered by this IndexProcessor
+   * should have the same data type.
+   *
+   * @return tsDataType of this IndexProcessor
+   */
+  private TSDataType initSeriesType() {
+    try {
+      if (indexSeries.isFullPath()) {
+        return MManager.getInstance().getSeriesType(indexSeries);
+      } else {
+        List<PartialPath> list =
+            IoTDB.metaManager.getAllTimeseriesPathWithAlias(indexSeries, 1, 0).left;
+        if (list.isEmpty()) {
+          throw new IndexRuntimeException("No series in the wildcard path");
+        } else {
+          return MManager.getInstance().getSeriesType(list.get(0));
+        }
+      }
+    } catch (MetadataException e) {
+      throw new IndexRuntimeException("get type failed. ", e);
+    }
+  }
+
+  private String getIndexDir(IndexType indexType) {
+    return indexSeriesDirPath + File.separator + indexType;
+  }
+
+  private void serializeUsable() {
+    File file = SystemFileFactory.INSTANCE.getFile(usableFile);
+    try (OutputStream outputStream = new FileOutputStream(file)) {
+      ReadWriteIOUtils.write(usableMap.size(), outputStream);
+      for (Entry<IndexType, IIndexUsable> entry : usableMap.entrySet()) {
+        IndexType indexType = entry.getKey();
+        ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+        IIndexUsable v = entry.getValue();
+        v.serialize(outputStream);
+      }
+    } catch (IOException e) {
+      logger.error("Error when serialize usability. Given up.", e);
+    }
+  }
+
+  private void serializePreviousBuffer() {
+    File file = SystemFileFactory.INSTANCE.getFile(previousDataBufferFile);
+    try (OutputStream outputStream = new FileOutputStream(file)) {
+      ReadWriteIOUtils.write(previousDataBufferMap.size(), outputStream);
+      for (Entry<IndexType, ByteBuffer> entry : previousDataBufferMap.entrySet()) {
+        IndexType indexType = entry.getKey();
+        ByteBuffer buffer = entry.getValue();
+        ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+        ReadWriteIOUtils.write(buffer, outputStream);
+      }
+    } catch (IOException e) {
+      logger.error("Error when serialize previous buffer. Given up.", e);
+    }
+  }
+
+  private void deserializePreviousBuffer() {
+    File file = SystemFileFactory.INSTANCE.getFile(previousDataBufferFile);
+    if (!file.exists()) {
+      return;
+    }
+    try (InputStream inputStream = new FileInputStream(file)) {
+      int size = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < size; i++) {
+        IndexType indexType = IndexType.deserialize(ReadWriteIOUtils.readShort(inputStream));
+        ByteBuffer byteBuffer =
+            ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(inputStream);
+        previousDataBufferMap.put(indexType, byteBuffer);
+      }
+    } catch (IOException e) {
+      logger.error("Error when deserialize previous buffer. Given up.", e);
+    }
+  }
+
+  private void deserializeUsable(PartialPath indexSeries) {
+    File file = SystemFileFactory.INSTANCE.getFile(usableFile);
+    if (!file.exists()) {
+      return;
+    }
+    try (InputStream inputStream = new FileInputStream(file)) {
+      int size = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < size; i++) {
+        short indexTypeShort = ReadWriteIOUtils.readShort(inputStream);
+        IndexType indexType = IndexType.deserialize(indexTypeShort);
+        IIndexUsable usable =
+            IIndexUsable.Factory.deserializeIndexUsability(indexSeries, inputStream);
+        usableMap.put(indexType, usable);
+      }
+    } catch (IOException | IllegalPathException e) {
+      logger.error("Error when deserialize usability. Given up.", e);
+    }
+  }
+
+  /** Flush all index instances to disk except NoIndex. */
+  @SuppressWarnings("squid:S2589")
+  public synchronized void close(boolean deleteFiles) throws IOException {
+    if (closed) {
+      return;
+    }
+    waitingFlushEndAndDo(
+        () -> {
+          lock.writeLock().lock();
+          try {
+            // store Preprocessor
+            for (Entry<IndexType, IoTDBIndex> entry : allPathsIndexMap.entrySet()) {
+              IndexType indexType = entry.getKey();
+              if (indexType == IndexType.NO_INDEX) {
+                continue;
+              }
+              IoTDBIndex index = entry.getValue();
+              previousDataBufferMap.put(entry.getKey(), index.closeAndRelease());
+            }
+            logger.info("close and release index processor: {}", indexSeries);
+            allPathsIndexMap.clear();
+            serializeUsable();
+            serializePreviousBuffer();
+            closed = true;
+            if (deleteFiles) {
+              File dir = IndexUtils.getIndexFile(indexSeriesDirPath);
+              dir.delete();
+            }
+          } finally {
+            lock.writeLock().unlock();
+          }
+        });
+  }
+
+  private void waitingFlushEndAndDo(IndexNaiveFunc indexNaiveAction) throws IOException {
+    // wait the flushing end.
+    long waitingTime;
+    long waitingInterval = 100;
+    long st = System.currentTimeMillis();
+    while (true) {
+      if (isFlushing()) {
+        try {
+          Thread.sleep(waitingInterval);
+        } catch (InterruptedException e) {
+          logger.error("interrupted, index insert may not complete.", e);
+          return;
+        }
+        waitingTime = System.currentTimeMillis() - st;
+        // wait for too long time.
+        if (waitingTime > 3000) {
+          waitingInterval = 1000;
+          if (logger.isWarnEnabled()) {
+            logger.warn(
+                String.format(
+                    "IndexFileProcessor %s: wait-close time %d ms is too long.",
+                    indexSeries, waitingTime));
+          }
+        }
+      } else {
+        indexNaiveAction.act();
+        break;
+      }
+    }
+  }
+
+  public PartialPath getIndexSeries() {
+    return indexSeries;
+  }
+
+  @Override
+  public int hashCode() {
+    return indexSeries.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+
+    return compareTo((IndexProcessor) obj) == 0;
+  }
+
+  @Override
+  public String toString() {
+    return indexSeries + ": " + allPathsIndexMap;
+  }
+
+  @Override
+  public int compareTo(IndexProcessor o) {
+    return indexSeries.compareTo(o.indexSeries);
+  }
+
+  private boolean isFlushing() {
+    return numIndexBuildTasks.get() > 0;
+  }
+
+  void startFlushMemTable() {
+    lock.writeLock().lock();
+    try {
+      if (closed) {
+        throw new IndexRuntimeException("closed index file !!!!!");
+      }
+      if (isFlushing()) {
+        throw new IndexRuntimeException("There has been a flushing, do you want to wait?");
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Insert sorted series into index instances.
+   *
+   * @param path the path of time series
+   * @param tvList the sorted series
+   */
+  void buildIndexForOneSeries(PartialPath path, TVList tvList) {
+    // for every index of this path, submit a task to pool.
+    lock.writeLock().lock();
+    numIndexBuildTasks.incrementAndGet();
+    try {
+      if (tvList.getDataType() != tsDataType) {
+        logger.warn(
+            "TsDataType unmatched, ignore: indexSeries {}: {}, given series {}: {}",
+            indexSeries,
+            tsDataType,
+            path,
+            tvList.getDataType());
+      }
+      allPathsIndexMap.forEach(
+          (indexType, index) -> {
+            // NO_INDEX doesn't involve the phase of building index
+            if (indexType == IndexType.NO_INDEX) {
+              numIndexBuildTasks.decrementAndGet();
+              return;
+            }
+            Runnable buildTask =
+                () -> {
+                  try {
+                    indexLockMap.get(indexType).writeLock().lock();
+                    IndexFeatureExtractor extractor = index.startFlushTask(path, tvList);
+                    while (extractor.hasNext()) {
+                      extractor.processNext();
+                      index.buildNext();
+                    }
+                    index.endFlushTask();
+                  } catch (IndexManagerException e) {
+                    // Give up the following data, but the previously serialized chunk will not be
+                    // affected.
+                    logger.error("build index failed", e);
+                  } catch (RuntimeException e) {
+                    logger.error("RuntimeException", e);
+                  } finally {
+                    numIndexBuildTasks.decrementAndGet();
+                    indexLockMap.get(indexType).writeLock().unlock();
+                  }
+                };
+            indexBuildPoolManager.submit(buildTask);
+          });
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /** Not return until all index insertion have finished. */
+  void endFlushMemTable() {
+    // wait until all flushing tasks end.
+    try {
+      waitingFlushEndAndDo(() -> {});
+    } catch (IOException ignored) {
+      // the exception is ignored
+    }
+  }
+
+  /**
+   * According to the passed-in {@code indexInfoMap}, refresh the index instances in the current
+   * IndexProcessor. When someone index is created or dropped, the IndexProcessor is out of date, so
+   * it needs to be refreshed.
+   *
+   * @param indexInfoMap passed from {@link IIndexRouter}
+   */
+  public void refreshSeriesIndexMapFromMManager(Map<IndexType, IndexInfo> indexInfoMap) {
+    lock.writeLock().lock();
+    try {
+      // Add indexes that are not in the previous map
+      for (Entry<IndexType, IndexInfo> entry : indexInfoMap.entrySet()) {
+        IndexType indexType = entry.getKey();
+        IndexInfo indexInfo = entry.getValue();
+        if (!allPathsIndexMap.containsKey(indexType)) {
+          IoTDBIndex index =
+              IndexType.constructIndex(
+                  indexSeries,
+                  tsDataType,
+                  getIndexDir(indexType),
+                  indexType,
+                  indexInfo,
+                  previousDataBufferMap.get(indexType));
+          allPathsIndexMap.putIfAbsent(indexType, index);
+          indexLockMap.putIfAbsent(indexType, new ReentrantReadWriteLock());
+          usableMap.putIfAbsent(
+              indexType, IIndexUsable.Factory.createEmptyIndexUsability(indexSeries));
+        }
+      }
+
+      // remove indexes that are removed from the previous map
+      for (IndexType indexType : new ArrayList<>(allPathsIndexMap.keySet())) {
+        if (!indexInfoMap.containsKey(indexType)) {
+          try {
+            allPathsIndexMap.get(indexType).closeAndRelease();
+          } catch (IOException e) {
+            logger.warn(
+                "Meet error when close {} before removing index, {}", indexType, e.getMessage());
+          }
+          // remove index file directories
+          File dir = IndexUtils.getIndexFile(getIndexDir(indexType));
+          dir.delete();
+          allPathsIndexMap.remove(indexType);
+          usableMap.remove(indexType);
+        }
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /** For unsequence data, mark them as "index-unusable" in corresponding IIndexUsable */
+  void updateUnsequenceData(PartialPath path, TVList tvList) {
+    this.usableMap.forEach(
+        (indexType, usable) ->
+            usable.minusUsableRange(path, tvList.getMinTime(), tvList.getLastTime()));
+  }
+
+  /**
+   * index query.
+   *
+   * @param indexType the type of index to be queried
+   * @param queryProps the query conditions
+   * @param context the query context
+   * @param alignedByTime true if result series need to be aligned by the timestamp.
+   * @return index query result
+   */
+  public QueryDataSet query(
+      IndexType indexType,
+      Map<String, Object> queryProps,
+      QueryContext context,
+      boolean alignedByTime)
+      throws QueryIndexException {
+    try {
+      lock.readLock().lock();
+      try {
+        if (!indexLockMap.containsKey(indexType)) {
+          throw new QueryIndexException(
+              String.format(
+                  "%s hasn't been built on %s", indexType.toString(), indexSeries.getFullPath()));
+        } else {
+          indexLockMap.get(indexType).readLock().lock();
+        }
+      } finally {
+        lock.readLock().unlock();
+      }
+      IoTDBIndex index = allPathsIndexMap.get(indexType);
+      return index.query(
+          queryProps, this.usableMap.get(indexType), context, refinePhaseOptimizer, alignedByTime);
+
+    } finally {
+      indexLockMap.get(indexType).readLock().unlock();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java b/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java
new file mode 100644
index 0000000..984e4ac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/IoTDBIndex.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.algorithm;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.index.common.DistSeries;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.index.feature.IndexFeatureExtractor;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * For index developers, the indexing framework aims to provide a simple and friendly platform and
+ * shield the complex details in other modules.
+ *
+ * <p>To add a new index methods, developers need inherit {@linkplain IoTDBIndex} or its subclass.
+ */
+public abstract class IoTDBIndex {
+
+  protected final PartialPath indexSeries;
+  protected final IndexType indexType;
+  protected final TSDataType tsDataType;
+
+  protected final Map<String, String> props;
+  protected IndexFeatureExtractor indexFeatureExtractor;
+
+  public IoTDBIndex(PartialPath indexSeries, TSDataType tsDataType, IndexInfo indexInfo) {
+    this.indexSeries = indexSeries;
+    this.indexType = indexInfo.getIndexType();
+    this.props = indexInfo.getProps();
+    this.tsDataType = tsDataType;
+  }
+
+  /**
+   * An index should determine which FeatureExtractor it uses and hook it to {@linkplain *
+   * IoTDBIndex}.indexFeatureExtractor. This method is called when IoTDBIndex is created.
+   *
+   * @param previous the status data saved in the last closing of the FeatureExtractor
+   * @param inQueryMode true if it's during index query, false if it's during index building
+   */
+  public abstract void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode);
+
+  /** A new item has been pre-processed by the FeatureExtractor, now the index can insert it. */
+  public abstract boolean buildNext() throws IndexManagerException;
+
+  /** This index will be closed, it's time to serialize in-memory data to disk for next open. */
+  protected abstract void flushIndex();
+
+  /**
+   * execute index query and return the result.
+   *
+   * @param queryProps query conditions
+   * @param iIndexUsable the information of index usability
+   * @param context query context provided by IoTDB.
+   * @param candidateOrderOptimize an optimizer for the order of visiting candidates
+   * @param alignedByTime true if the result series need to aligned by timestamp, otherwise they
+   *     will be aligned by their first points
+   * @return the result should be consistent with other IoTDB query result.
+   */
+  public abstract QueryDataSet query(
+      Map<String, Object> queryProps,
+      IIndexUsable iIndexUsable,
+      QueryContext context,
+      IIndexCandidateOrderOptimize candidateOrderOptimize,
+      boolean alignedByTime)
+      throws QueryIndexException;
+
+  /**
+   * In current design, the index building (insert data) only occurs in the memtable flush. When
+   * this method is called, a batch of raw data is coming.
+   *
+   * @param tvList tvList to insert
+   * @return FeatureExtractor filled with the given raw data
+   */
+  public IndexFeatureExtractor startFlushTask(PartialPath partialPath, TVList tvList) {
+    this.indexFeatureExtractor.appendNewSrcData(tvList);
+    return indexFeatureExtractor;
+  }
+
+  /** The flush task has ended. */
+  public void endFlushTask() {
+    indexFeatureExtractor.clearProcessedSrcData();
+  }
+
+  /** Close the index, release resources of the index structure and the feature extractor. */
+  public ByteBuffer closeAndRelease() throws IOException {
+    flushIndex();
+    if (indexFeatureExtractor != null) {
+      return indexFeatureExtractor.closeAndRelease();
+    } else {
+      return ByteBuffer.allocate(0);
+    }
+  }
+
+  public TSDataType getTsDataType() {
+    return tsDataType;
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
+  }
+
+  @Override
+  public String toString() {
+    return indexType.toString();
+  }
+
+  protected QueryDataSet constructSearchDataset(List<DistSeries> res, boolean alignedByTime)
+      throws QueryIndexException {
+    return constructSearchDataset(
+        res, alignedByTime, IoTDBDescriptor.getInstance().getConfig().getMaxIndexQueryResultSize());
+  }
+
+  protected QueryDataSet constructSearchDataset(
+      List<DistSeries> res, boolean alignedByTime, int nMaxReturnSeries)
+      throws QueryIndexException {
+    if (alignedByTime) {
+      throw new QueryIndexException("Unsupported alignedByTime result");
+    }
+    // make result paths and types
+    nMaxReturnSeries = Math.min(nMaxReturnSeries, res.size());
+    List<PartialPath> paths = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    Map<String, Integer> pathToIndex = new HashMap<>();
+    int nMinLength = Integer.MAX_VALUE;
+    for (int i = 0; i < nMaxReturnSeries; i++) {
+      PartialPath series = res.get(i).partialPath;
+      paths.add(series);
+      pathToIndex.put(series.getFullPath(), i);
+      types.add(tsDataType);
+      if (res.get(i).tvList.size() < nMinLength) {
+        nMinLength = res.get(i).tvList.size();
+      }
+    }
+    IndexQueryDataSet dataSet = new IndexQueryDataSet(paths, types, pathToIndex);
+    if (nMaxReturnSeries == 0) {
+      return dataSet;
+    }
+    for (int row = 0; row < nMinLength; row++) {
+      RowRecord record = new RowRecord(row);
+      for (int col = 0; col < nMaxReturnSeries; col++) {
+        TVList tvList = res.get(col).tvList;
+        record.addField(IndexUtils.getValue(tvList, row), tsDataType);
+      }
+      dataSet.putRecord(record);
+    }
+    return dataSet;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java b/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java
new file mode 100644
index 0000000..1e9ff88
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/NoIndex.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.algorithm;
+
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
+import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
+import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * NoIndex do nothing on feature extracting and data pruning. Its index-available range is always
+ * empty.
+ */
+public class NoIndex extends IoTDBIndex {
+
+  public NoIndex(PartialPath path, TSDataType tsDataType, IndexInfo indexInfo) {
+    super(path, tsDataType, indexInfo);
+  }
+
+  @Override
+  public void initFeatureExtractor(ByteBuffer previous, boolean inQueryMode) {
+    // NoIndex does nothing
+  }
+
+  @Override
+  public boolean buildNext() {
+    return true;
+  }
+
+  @Override
+  protected void flushIndex() {
+    // NoIndex does nothing
+  }
+
+  @Override
+  public QueryDataSet query(
+      Map<String, Object> queryProps,
+      IIndexUsable iIndexUsable,
+      QueryContext context,
+      IIndexCandidateOrderOptimize candidateOrderOptimize,
+      boolean alignedByTime) {
+    return new IndexQueryDataSet(
+        Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java b/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java
new file mode 100644
index 0000000..aa50be1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/DistSeries.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+public class DistSeries {
+
+  public double dist;
+  public TVList tvList;
+  public PartialPath partialPath;
+
+  public DistSeries(double dist, TVList tvList, PartialPath partialPath) {
+    this.dist = dist;
+    this.tvList = tvList;
+    this.partialPath = partialPath;
+  }
+
+  public String toString() {
+    return "(" + partialPath + "," + dist + ":" + tvList + ")";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
index d971aa7..3c0cd32 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexConstant.java
@@ -19,18 +19,72 @@ package org.apache.iotdb.db.index.common;
 
 public class IndexConstant {
 
+  // SQL show
+  public static final String ID = "ID";
+  public static final String NON_IMPLEMENTED_MSG = "Not implemented yet";
+
+  public static final String ROUTER_DIR = "router";
+  public static final String META_DIR_NAME = "index_meta";
+  public static final String INDEX_DATA_DIR_NAME = "index_data";
+  public static final String STORAGE_GROUP_INDEXING_SUFFIX = ".sg_indexing";
+  public static final String STORAGE_GROUP_INDEXED_SUFFIX = ".sg_index";
+
+  public static final String INDEXING_SUFFIX = ".indexing";
+  public static final String INDEXED_SUFFIX = ".index";
+
   // whole matching
   public static final int NON_SET_TOP_K = -1;
   public static final String TOP_K = "TOP_K";
 
+  // subsequence matching: sliding window
+  public static final String INDEX_WINDOW_RANGE = "INDEX_WINDOW_RANGE";
+  public static final String INDEX_RANGE_STRATEGY = "INDEX_RANGE_STRATEGY";
+  public static final String INDEX_SLIDE_STEP = "INDEX_SLIDE_STEP";
+
+  public static final String INDEX_MAGIC = "IoTDBIndex";
+  public static final String DEFAULT_PROP_NAME = "DEFAULT";
+
+  public static final int INDEX_MAP_INIT_RESERVE_SIZE = 5;
+
   public static final String PATTERN = "PATTERN";
   public static final String THRESHOLD = "THRESHOLD";
+  public static final String BORDER = "BORDER";
+
+  // MBR Index parameters
+  public static final String FEATURE_DIM = "FEATURE_DIM";
+  public static final String DEFAULT_FEATURE_DIM = "4";
+  public static final String SEED_PICKER = "SEED_PICKER";
+  public static final String MAX_ENTRIES = "MAX_ENTRIES";
+  public static final String MIN_ENTRIES = "MIN_ENTRIES";
 
   // RTree PAA parameters
   public static final String PAA_DIM = "PAA_DIM";
+  public static final String SERIES_LENGTH = "SERIES_LENGTH";
+  public static final String DEFAULT_SERIES_LENGTH = "16";
+  public static final String DEFAULT_RTREE_PAA_DISTANCE = "2";
+
+  // Distance
+  public static final String DISTANCE = "DISTANCE";
+  public static final String L_INFINITY = "L_INFINITY";
+  public static final String DEFAULT_DISTANCE = "2";
+
+  // ELB Type
+  public static final String ELB_TYPE = "ELB_TYPE";
+  public static final String ELB_TYPE_ELE = "ELE";
+  public static final String ELB_TYPE_SEQ = "SEQ";
+  public static final String DEFAULT_ELB_TYPE = "SEQ";
+  public static final int DEFAULT_BLOCK_SIZE = 20;
 
   // ELB: calc param
   public static final String BLOCK_SIZE = "BLOCK_SIZE";
+  public static final String ELB_CALC_PARAM = "ELB_CALC_PARAM";
+  public static final String DEFAULT_ELB_CALC_PARAM = "SINGLE";
+  public static final String ELB_CALC_PARAM_SINGLE = "SINGLE";
+  public static final String ELB_THRESHOLD_BASE = "ELB_THRESHOLD_BASE";
+  public static final String ELB_THRESHOLD_RATIO = "ELB_THRESHOLD_RATIO";
+  public static final double ELB_DEFAULT_THRESHOLD_RATIO = 0.1;
+
+  public static final String MISSING_PARAM_ERROR_MESSAGE = "missing parameter: %s";
 
   private IndexConstant() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java
new file mode 100644
index 0000000..e2e67ab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexInfo.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.metadata.MetadataOperationType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IndexInfo implements Cloneable {
+
+  private Map<String, String> props;
+  private long time;
+  private IndexType indexType;
+
+  public IndexInfo(IndexType indexType, long time, Map<String, String> props) {
+    this.props = props;
+    this.time = time;
+    this.indexType = indexType;
+  }
+
+  public Map<String, String> getProps() {
+    return props;
+  }
+
+  public void setProps(Map<String, String> props) {
+    this.props = props;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public void setTime(long time) {
+    this.time = time;
+  }
+
+  public void setIndexType(IndexType indexType) {
+    this.indexType = indexType;
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
+  }
+
+  @Deprecated
+  public String serializeCreateIndex(String path) {
+    StringBuilder res = new StringBuilder();
+    res.append(
+        String.format(
+            "%s,%s,%s,%s", MetadataOperationType.CREATE_INDEX, path, indexType.serialize(), time));
+    if (props != null && !props.isEmpty()) {
+      for (Map.Entry entry : props.entrySet()) {
+        res.append(String.format(",%s=%s", entry.getKey(), entry.getValue()));
+      }
+    }
+    return res.toString();
+  }
+
+  /**
+   * @param args [0] is the MetadataType, [1] is the path, the rest is to be parsed.
+   * @return parsed IndexInfo
+   */
+  @Deprecated
+  public static IndexInfo deserializeCreateIndex(String[] args) {
+    IndexType indexType = IndexType.deserialize(Short.parseShort(args[2]));
+    long time = Long.parseLong(args[3]);
+    HashMap<String, String> indexProps = null;
+    if (args.length > 4) {
+      String[] kv;
+      indexProps = new HashMap<>(args.length - 4 + 1, 1);
+      for (int k = 4; k < args.length; k++) {
+        kv = args[k].split("=");
+        indexProps.put(kv[0], kv[1]);
+      }
+    }
+    return new IndexInfo(indexType, time, indexProps);
+  }
+
+  @Deprecated
+  public static String serializeDropIndex(String path, IndexType indexType) {
+    return String.format("%s,%s,%s", MetadataOperationType.DROP_INDEX, path, indexType.serialize());
+  }
+
+  /**
+   * @param args [0] is the MetadataType, [1] is the path, the rest is to be parsed.
+   * @return parsed IndexInfo
+   */
+  @Deprecated
+  public static IndexType deserializeDropIndex(String[] args) {
+    return IndexType.deserialize(Short.parseShort(args[2]));
+  }
+
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(indexType.serialize(), outputStream);
+    ReadWriteIOUtils.write(time, outputStream);
+    ReadWriteIOUtils.write(props, outputStream);
+  }
+
+  public static IndexInfo deserialize(InputStream inputStream) throws IOException {
+    short indexTypeShort = ReadWriteIOUtils.readShort(inputStream);
+    IndexType indexType = IndexType.deserialize(indexTypeShort);
+    long time = ReadWriteIOUtils.readLong(inputStream);
+    Map<String, String> indexProps = ReadWriteIOUtils.readMap(inputStream);
+    return new IndexInfo(indexType, time, indexProps);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("[type: %s, time: %d, props: %s]", indexType, time, props);
+  }
+
+  @Override
+  public Object clone() {
+    return new IndexInfo(indexType, time, new HashMap<>(props));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java
new file mode 100644
index 0000000..ae3a875
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexProcessorStruct.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class IndexProcessorStruct {
+
+  public IndexProcessor processor;
+  public PartialPath representativePath;
+  public Map<IndexType, IndexInfo> infos;
+
+  public IndexProcessorStruct(
+      IndexProcessor processor, PartialPath representativePath, Map<IndexType, IndexInfo> infos) {
+    this.processor = processor;
+    this.representativePath = representativePath;
+    this.infos = infos;
+  }
+
+  public List<StorageGroupProcessor> addMergeLock() throws StorageEngineException {
+    return StorageEngine.getInstance().mergeLock(Collections.singletonList(representativePath));
+  }
+
+  @Override
+  public String toString() {
+    return "<" + infos + "\n" + processor + ">";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
index 177e7bd..ec94158 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexType.java
@@ -19,13 +19,20 @@
 package org.apache.iotdb.db.index.common;
 
 import org.apache.iotdb.db.exception.index.UnsupportedIndexTypeException;
+import org.apache.iotdb.db.index.algorithm.IoTDBIndex;
+import org.apache.iotdb.db.index.algorithm.NoIndex;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 public enum IndexType {
   NO_INDEX,
   RTREE_PAA,
-  ELB_INDEX,
-  KV_INDEX;
+  ELB_INDEX;
 
   /**
    * judge the index type.
@@ -41,17 +48,11 @@ public enum IndexType {
         return RTREE_PAA;
       case 2:
         return ELB_INDEX;
-      case 3:
-        return KV_INDEX;
       default:
         throw new NotImplementedException("Given index is not implemented");
     }
   }
 
-  public static int getSerializedSize() {
-    return Short.BYTES;
-  }
-
   /**
    * judge the index deserialize type.
    *
@@ -65,8 +66,6 @@ public enum IndexType {
         return 1;
       case ELB_INDEX:
         return 2;
-      case KV_INDEX:
-        return 3;
       default:
         throw new NotImplementedException("Given index is not implemented");
     }
@@ -81,4 +80,41 @@ public enum IndexType {
       throw new UnsupportedIndexTypeException(indexTypeString);
     }
   }
+
+  private static IoTDBIndex newIndexByType(
+      PartialPath path,
+      TSDataType tsDataType,
+      String indexDir,
+      IndexType indexType,
+      IndexInfo indexInfo) {
+    switch (indexType) {
+      case NO_INDEX:
+        return new NoIndex(path, tsDataType, indexInfo);
+      case ELB_INDEX:
+        //        return new ELBIndex(path, tsDataType, indexDir, indexInfo);
+      case RTREE_PAA:
+        //        return new RTreePAAIndex(path, tsDataType, indexDir, indexInfo);
+      default:
+        throw new NotImplementedException("unsupported index type:" + indexType);
+    }
+  }
+
+  public static IoTDBIndex constructIndex(
+      PartialPath indexSeries,
+      TSDataType tsDataType,
+      String indexDir,
+      IndexType indexType,
+      IndexInfo indexInfo,
+      ByteBuffer previous) {
+    indexInfo.setProps(uppercaseStringProps(indexInfo.getProps()));
+    IoTDBIndex index = newIndexByType(indexSeries, tsDataType, indexDir, indexType, indexInfo);
+    index.initFeatureExtractor(previous, false);
+    return index;
+  }
+
+  private static Map<String, String> uppercaseStringProps(Map<String, String> props) {
+    Map<String, String> uppercase = new HashMap<>(props.size());
+    props.forEach((k, v) -> uppercase.put(k.toUpperCase(), v.toUpperCase()));
+    return uppercase;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
index 98809f0..f930cff 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
@@ -17,8 +17,40 @@
  */
 package org.apache.iotdb.db.index.common;
 
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
 public class IndexUtils {
 
+  /**
+   * justify whether the fullPath matches the pathWithStar. The two paths must have the same node
+   * lengths. Refer to {@code org.apache.iotdb.db.metadata.MManager#match}
+   *
+   * @param pathWithStar a path with wildcard characters.
+   * @param fullPath a full path without wildcard characters.
+   * @return true if fullPath matches pathWithStar.
+   */
+  public static boolean match(PartialPath pathWithStar, PartialPath fullPath) {
+    String[] fullNodes = fullPath.getNodes();
+    String[] starNodes = pathWithStar.getNodes();
+    if (starNodes.length != fullNodes.length) {
+      return false;
+    }
+    for (int i = 0; i < fullNodes.length; i++) {
+      if (!"*".equals(starNodes[i]) && !fullNodes[i].equals(starNodes[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   public static String removeQuotation(String v) {
     int start = 0;
     int end = v.length();
@@ -31,5 +63,48 @@ public class IndexUtils {
     return v.substring(start, end);
   }
 
+  public static File getIndexFile(String filePath) {
+    return SystemFileFactory.INSTANCE.getFile(filePath);
+  }
+
   private IndexUtils() {}
+
+  public static Map<String, Object> toLowerCaseProps(Map<String, Object> props) {
+    Map<String, Object> uppercase = new HashMap<>(props.size());
+    for (Entry<String, Object> entry : props.entrySet()) {
+      String k = entry.getKey();
+      Object v = entry.getValue();
+      if (v instanceof String) {
+        uppercase.put(k.toUpperCase(), ((String) v).toUpperCase());
+      } else {
+        uppercase.put(k.toUpperCase(), v);
+      }
+    }
+    return uppercase;
+  }
+
+  public static Object getValue(TVList srcData, int idx) {
+    switch (srcData.getDataType()) {
+      case INT32:
+        return srcData.getInt(idx);
+      case INT64:
+        return srcData.getLong(idx);
+      case FLOAT:
+        return srcData.getFloat(idx);
+      case DOUBLE:
+        return (float) srcData.getDouble(idx);
+      default:
+        throw new NotImplementedException(srcData.getDataType().toString());
+    }
+  }
+
+  /**
+   * "*" is illegal in Windows directory path. Replace it with "#"
+   *
+   * @param previousDir path which may contains "*"
+   * @return path replacing "*" with "#"
+   */
+  public static String removeIllegalStarInDir(String previousDir) {
+    return previousDir.replace('*', '#');
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java b/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java
new file mode 100644
index 0000000..3ebca7c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/func/CreateIndexProcessorFunc.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common.func;
+
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.util.Map;
+
+/**
+ * Do something without input and output.
+ *
+ * <p>This is a <a href="package-summary.html">functional interface</a> whose functional method is
+ * {@link #act(PartialPath indexSeries, Map indexInfoMap)}.
+ */
+@FunctionalInterface
+public interface CreateIndexProcessorFunc {
+
+  /** Do something. */
+  IndexProcessor act(PartialPath indexSeries, Map<IndexType, IndexInfo> indexInfoMap);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java b/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java
new file mode 100644
index 0000000..6641f60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/func/IndexNaiveFunc.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.common.func;
+
+import java.io.IOException;
+
+/**
+ * Do something without input and output.
+ *
+ * <p>This is a <a href="package-summary.html">functional interface</a> whose functional method is
+ * {@link #act()}.
+ */
+@FunctionalInterface
+public interface IndexNaiveFunc {
+
+  /** Do something. */
+  void act() throws IOException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java
new file mode 100644
index 0000000..64baed2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/Randomwalk.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.common.math;
+
+import org.apache.iotdb.db.index.common.math.probability.UniformProba;
+import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.Random;
+
+public class Randomwalk {
+
+  public static TVList generateRanWalkTVList(long length, long seed, float R, float miu) {
+    TVList res = TVListAllocator.getInstance().allocate(TSDataType.DOUBLE);
+    double lastPoint = R;
+    Random r = new Random(seed);
+    UniformProba uniform = new UniformProba(miu / 2, -miu / 2, r);
+    for (int i = 0; i < length; i++) {
+      lastPoint = lastPoint + uniform.getNextRandom();
+      res.putDouble(i, lastPoint);
+    }
+    return res;
+  }
+
+  public static TVList generateRanWalkTVList(long length) {
+    return generateRanWalkTVList(length, 0, 0, 1);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
index 7b8b8cf..044f915 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/Probability.java
@@ -15,15 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.exception.index;
+package org.apache.iotdb.db.index.common.math.probability;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+public abstract class Probability {
 
-public class QueryIndexException extends QueryProcessException {
-
-  private static final long serialVersionUID = 9019233783504576296L;
-
-  public QueryIndexException(String message, int errorCode) {
-    super(message, errorCode);
-  }
+  public abstract double getNextRandom();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
copy to server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
index 7b8b8cf..909d080 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/index/QueryIndexException.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/math/probability/UniformProba.java
@@ -15,15 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.exception.index;
+package org.apache.iotdb.db.index.common.math.probability;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import java.util.Random;
 
-public class QueryIndexException extends QueryProcessException {
+public class UniformProba extends Probability {
 
-  private static final long serialVersionUID = 9019233783504576296L;
+  private final double upBound;
+  private final double downBound;
+  private final double range;
+  private Random r;
 
-  public QueryIndexException(String message, int errorCode) {
-    super(message, errorCode);
+  public UniformProba(double upBound, double downBound, Random r) {
+    assert upBound > downBound;
+    this.upBound = upBound;
+    this.downBound = downBound;
+    this.range = upBound - downBound;
+    this.r = r;
+  }
+
+  @Override
+  public double getNextRandom() {
+    return r.nextDouble() * range + downBound;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java b/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java
new file mode 100644
index 0000000..d1faa1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/feature/IndexFeatureExtractor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.feature;
+
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.NON_IMPLEMENTED_MSG;
+
+/**
+ * For all indexes, the raw input sequence has to be pre-processed before it's organized by indexes.
+ * In general, index structure needn't maintain all of original data, but only pointers (e.g. the
+ * identifier [start_time, end_time, series_path] can identify a time sequence uniquely).
+ *
+ * <p>By and large, similarity index supposes the input data are ideal: fixed dimension, equal
+ * interval, not missing values and even not outliers. However, in real scenario, the input series
+ * may contain missing values (thus it's not dimension-fixed) and the point's timestamp may contain
+ * slight offset (thus they are not equal-interval). IndexFeatureExtractor need to preprocess the
+ * series and obtain clean series to insert.
+ *
+ * <p>Many indexes will further extract features of alignment sequences, such as PAA, SAX, FFT, etc.
+ *
+ * <p>In summary, the IndexFeatureExtractor can provide three-level information:
+ *
+ * <ul>
+ *   <li>L1: a triplet to identify a series: {@code {StartTime, EndTime, Length}} (not submitted in
+ *       this pr)
+ *   <li>L2: aligned sequence: {@code {a1, a2, ..., an}}
+ *   <li>L3: feature: {@code {C1, C2, ..., Cm}}
+ * </ul>
+ */
+public abstract class IndexFeatureExtractor {
+
+  /** In the BUILD and QUERY modes, the IndexFeatureExtractor may work differently. */
+  protected boolean inQueryMode;
+
+  IndexFeatureExtractor(boolean inQueryMode) {
+    this.inQueryMode = inQueryMode;
+  }
+
+  /**
+   * Input a list of new data into the FeatureProcessor.
+   *
+   * @param newData new coming data.
+   */
+  public abstract void appendNewSrcData(TVList newData);
+
+  /**
+   * Input a list of new data into the FeatureProcessor. Due to the exist codes, IoTDB generate
+   * TVList in the insert phase, but obtain BatchData in the query phase.
+   *
+   * @param newData new coming data.
+   */
+  public abstract void appendNewSrcData(BatchData newData);
+
+  /** Having done {@code appendNewSrcData}, the index framework will check {@code hasNext}. */
+  public abstract boolean hasNext();
+
+  /**
+   * If {@code hasNext} returns true, the index framework will call {@code processNext} which
+   * prepares a new series item (ready for insert).
+   */
+  public abstract void processNext();
+
+  /**
+   * After processing a batch of data, the index framework may call {@code clearProcessedSrcData} to
+   * clean out the processed data for releasing memory.
+   */
+  public abstract void clearProcessedSrcData();
+
+  /**
+   * close this extractor and release all allocated resources (TVList and PrimitiveList). It also
+   * returns the data saved for next open.
+   *
+   * @return the data saved for next open.
+   */
+  public abstract ByteBuffer closeAndRelease() throws IOException;
+
+  /** @return current L2: aligned sequence. */
+  public Object getCurrent_L2_AlignedSequence() {
+    throw new UnsupportedOperationException(NON_IMPLEMENTED_MSG);
+  }
+
+  /** @return current L3: costumed feature. */
+  public Object getCurrent_L3_Feature() {
+    throw new UnsupportedOperationException(NON_IMPLEMENTED_MSG);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java b/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java
new file mode 100644
index 0000000..8d95171
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/IndexQueryDataSet.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+import java.util.Map;
+
+public class IndexQueryDataSet extends ListDataSet {
+
+  private Map<String, Integer> pathToIndex;
+
+  public IndexQueryDataSet(
+      List<PartialPath> paths, List<TSDataType> dataTypes, Map<String, Integer> pathToIndex) {
+    super(paths, dataTypes);
+    this.pathToIndex = pathToIndex;
+  }
+
+  public Map<String, Integer> getPathToIndex() {
+    return pathToIndex;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java b/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java
new file mode 100644
index 0000000..c7f07a5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/QueryIndexExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.index.read;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.IndexUtils;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.util.List;
+import java.util.Map;
+
+/** the executor for index query */
+public class QueryIndexExecutor {
+
+  private final Map<String, Object> queryProps;
+  private final IndexType indexType;
+  private final QueryContext context;
+  private final List<PartialPath> paths;
+  private final boolean alignedByTime;
+
+  public QueryIndexExecutor(QueryIndexPlan queryIndexPlan, QueryContext context) {
+    this.indexType = queryIndexPlan.getIndexType();
+    this.queryProps = IndexUtils.toLowerCaseProps(queryIndexPlan.getProps());
+    this.paths = queryIndexPlan.getPaths();
+    this.alignedByTime = queryIndexPlan.isAlignedByTime();
+    this.paths.forEach(PartialPath::toLowerCase);
+    this.context = context;
+  }
+
+  public QueryDataSet executeIndexQuery() throws StorageEngineException, QueryIndexException {
+    // get all related storage group
+    return IndexManager.getInstance()
+        .queryIndex(paths, indexType, queryProps, context, alignedByTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java
new file mode 100644
index 0000000..c9dcaee
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/IIndexCandidateOrderOptimize.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read.optimize;
+
+/**
+ * In the indexing mechanism, it refers to the optimization of the access order of candidate series
+ * after pruning phase. Due to the mismatch between the candidate set order given by the index and
+ * the file organization of TsFile, the access of the refinement phase advised by the index may be
+ * inefficient. We can use this query optimizerto rearrange the candidate set access order.
+ */
+public interface IIndexCandidateOrderOptimize {
+  class Factory {
+
+    private Factory() {
+      // hidden initializer
+    }
+
+    public static IIndexCandidateOrderOptimize getOptimize() {
+      return new NoCandidateOrderOptimizer();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java
new file mode 100644
index 0000000..0354bf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/read/optimize/NoCandidateOrderOptimizer.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.read.optimize;
+
+class NoCandidateOrderOptimizer implements IIndexCandidateOrderOptimize {}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java b/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java
new file mode 100644
index 0000000..bc6cfdd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/router/IIndexRouter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Singleton pattern.
+ *
+ * <p>Firstly, IIndexRouter is responsible for index metadata management. More importantly, it is
+ * for routing the create/drop/insert/query command to corresponding index processors.
+ *
+ * <p>IIndexRouter can decouple the mapping relationship, which may be re-designed in future,
+ * between {@link org.apache.iotdb.db.index.algorithm.IoTDBIndex IoTDBIndex} and {@link
+ * IndexProcessor} from {@link org.apache.iotdb.db.index.IndexManager IndexManager}.
+ */
+public interface IIndexRouter {
+
+  /**
+   * add an new index into the router.
+   *
+   * @param prefixPath the partial path of given index series
+   * @param indexInfo the index infomation.
+   * @param func a function to create a new IndexProcessor, if it's not created before.
+   * @param doSerialize true to serialize the new information immediately.
+   * @return true if adding index information successfully
+   */
+  boolean addIndexIntoRouter(
+      PartialPath prefixPath,
+      IndexInfo indexInfo,
+      CreateIndexProcessorFunc func,
+      boolean doSerialize)
+      throws MetadataException;
+
+  /**
+   * remove an exist index into the router.
+   *
+   * @param prefixPath the partial path of given index series
+   * @param indexType the type of index to be removed.
+   * @return true if removing index information successfully
+   */
+  boolean removeIndexFromRouter(PartialPath prefixPath, IndexType indexType)
+      throws MetadataException, IOException;
+
+  Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries);
+
+  Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo();
+
+  Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries);
+
+  /**
+   * serialize all index information and processors to the disk
+   *
+   * @param doClose true if close processors after serialization.
+   */
+  void serialize(boolean doClose);
+
+  /** deserialize all index information and processors into the memory */
+  void deserializeAndReload(CreateIndexProcessorFunc func);
+
+  /**
+   * return a subset of the original IIndexRouter for accessing concurrency
+   *
+   * @param storageGroupPath the path of a storageGroup
+   * @return a subset of the original IIndexRouter
+   */
+  IIndexRouter getRouterByStorageGroup(String storageGroupPath);
+
+  int getIndexNum();
+
+  /**
+   * prepare necessary information for index query
+   *
+   * @param partialPath the query path
+   * @param indexType the index type
+   * @param context the query context
+   * @return the necessary information for this query
+   */
+  IndexProcessorStruct startQueryAndCheck(
+      PartialPath partialPath, IndexType indexType, QueryContext context)
+      throws QueryIndexException;
+
+  /**
+   * do something when the query end
+   *
+   * @param indexSeries the query path
+   * @param indexType the index type
+   * @param context the query context
+   */
+  void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context);
+
+  class Factory {
+
+    private Factory() {
+      // hidden initializer
+    }
+
+    public static IIndexRouter getIndexRouter(String routerDir) {
+      return new ProtoIndexRouter(routerDir);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java b/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java
new file mode 100644
index 0000000..0892758
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/router/ProtoIndexRouter.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexProcessorStruct;
+import org.apache.iotdb.db.index.common.IndexType;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A proto implementation.
+ *
+ * <p>The subsequence-matching index is created on a single time series, while the whole-matching
+ * index is created on a group of time series with wildcards, so {@code ProtoIndexRouter} manages
+ * the two cases with different Map structures.
+ *
+ * <p>A key function of {@code IIndexRouter } is to quickly route the IndexProcessor for a given
+ * series path. If the path is full-path, it can be found as O(1) in {@code fullPathProcessorMap};
+ * Otherwise, you must traverse every key in {@code wildCardProcessorMap}.
+ */
+public class ProtoIndexRouter implements IIndexRouter {
+
+  private static final Logger logger = LoggerFactory.getLogger(ProtoIndexRouter.class);
+
+  /** for subsequence matching indexes */
+  private Map<String, IndexProcessorStruct> fullPathProcessorMap;
+  /** for whole matching indexes */
+  private Map<PartialPath, IndexProcessorStruct> wildCardProcessorMap;
+
+  private Map<String, Set<String>> sgToFullPathMap;
+  private Map<String, Set<PartialPath>> sgToWildCardPathMap;
+  private MManager mManager;
+  private File routerFile;
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private boolean unmodifiable;
+
+  ProtoIndexRouter(String routerFileDir) {
+    this(false);
+    this.routerFile = SystemFileFactory.INSTANCE.getFile(routerFileDir + File.separator + "router");
+    mManager = MManager.getInstance();
+  }
+
+  private ProtoIndexRouter(boolean unmodifiable) {
+    this.unmodifiable = unmodifiable;
+    fullPathProcessorMap = new ConcurrentHashMap<>();
+    sgToFullPathMap = new ConcurrentHashMap<>();
+    sgToWildCardPathMap = new ConcurrentHashMap<>();
+    wildCardProcessorMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void serialize(boolean closeProcessor) {
+    try (OutputStream outputStream = new FileOutputStream(routerFile)) {
+      ReadWriteIOUtils.write(fullPathProcessorMap.size(), outputStream);
+      for (Entry<String, IndexProcessorStruct> entry : fullPathProcessorMap.entrySet()) {
+        String indexSeries = entry.getKey();
+        IndexProcessorStruct v = entry.getValue();
+        ReadWriteIOUtils.write(indexSeries, outputStream);
+        ReadWriteIOUtils.write(v.infos.size(), outputStream);
+        for (Entry<IndexType, IndexInfo> e : v.infos.entrySet()) {
+          IndexInfo indexInfo = e.getValue();
+          indexInfo.serialize(outputStream);
+        }
+        if (closeProcessor && v.processor != null) {
+          v.processor.close(false);
+        }
+      }
+
+      ReadWriteIOUtils.write(wildCardProcessorMap.size(), outputStream);
+      for (Entry<PartialPath, IndexProcessorStruct> entry : wildCardProcessorMap.entrySet()) {
+        PartialPath indexSeries = entry.getKey();
+        IndexProcessorStruct v = entry.getValue();
+        ReadWriteIOUtils.write(indexSeries.getFullPath(), outputStream);
+        ReadWriteIOUtils.write(v.infos.size(), outputStream);
+        for (Entry<IndexType, IndexInfo> e : v.infos.entrySet()) {
+          IndexInfo indexInfo = e.getValue();
+          indexInfo.serialize(outputStream);
+        }
+        if (closeProcessor && v.processor != null) {
+          v.processor.close(false);
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Error when serialize router. Given up.", e);
+    }
+    if (closeProcessor) {
+      fullPathProcessorMap.clear();
+      sgToFullPathMap.clear();
+      sgToWildCardPathMap.clear();
+      wildCardProcessorMap.clear();
+    }
+  }
+
+  @Override
+  public void deserializeAndReload(CreateIndexProcessorFunc func) {
+    if (!routerFile.exists()) {
+      return;
+    }
+    try (InputStream inputStream = new FileInputStream(routerFile)) {
+      int fullSize = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < fullSize; i++) {
+        String indexSeries = ReadWriteIOUtils.readString(inputStream);
+        int indexTypeSize = ReadWriteIOUtils.readInt(inputStream);
+        for (int j = 0; j < indexTypeSize; j++) {
+          IndexInfo indexInfo = IndexInfo.deserialize(inputStream);
+          addIndexIntoRouter(new PartialPath(indexSeries), indexInfo, func, false);
+        }
+      }
+
+      int wildcardSize = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < wildcardSize; i++) {
+        String indexSeries = ReadWriteIOUtils.readString(inputStream);
+        int indexTypeSize = ReadWriteIOUtils.readInt(inputStream);
+        for (int j = 0; j < indexTypeSize; j++) {
+          IndexInfo indexInfo = IndexInfo.deserialize(inputStream);
+          addIndexIntoRouter(new PartialPath(indexSeries), indexInfo, func, false);
+        }
+      }
+    } catch (MetadataException | IOException e) {
+      logger.error("Error when deserialize router. Given up.", e);
+    }
+  }
+
+  @Override
+  public IIndexRouter getRouterByStorageGroup(String storageGroupPath) {
+    lock.readLock().lock();
+    try {
+      ProtoIndexRouter res = new ProtoIndexRouter(true);
+      if (sgToWildCardPathMap.containsKey(storageGroupPath)) {
+        for (PartialPath partialPath : sgToWildCardPathMap.get(storageGroupPath)) {
+          res.wildCardProcessorMap.put(partialPath, this.wildCardProcessorMap.get(partialPath));
+        }
+      }
+      if (sgToFullPathMap.containsKey(storageGroupPath)) {
+        for (String fullPath : sgToFullPathMap.get(storageGroupPath)) {
+          res.fullPathProcessorMap.put(fullPath, this.fullPathProcessorMap.get(fullPath));
+        }
+      }
+      return res;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public int getIndexNum() {
+    return fullPathProcessorMap.size() + wildCardProcessorMap.size();
+  }
+
+  @Override
+  public IndexProcessorStruct startQueryAndCheck(
+      PartialPath partialPath, IndexType indexType, QueryContext context)
+      throws QueryIndexException {
+    lock.readLock().lock();
+    try {
+      IndexProcessorStruct struct;
+      if (partialPath.isFullPath()) {
+        String fullPath = partialPath.getFullPath();
+        if (!fullPathProcessorMap.containsKey(fullPath)) {
+          throw new QueryIndexException("haven't create index on " + fullPath);
+        }
+        struct = fullPathProcessorMap.get(fullPath);
+      } else {
+        if (!wildCardProcessorMap.containsKey(partialPath)) {
+          throw new QueryIndexException("haven't create index on " + partialPath);
+        }
+        struct = wildCardProcessorMap.get(partialPath);
+      }
+      if (struct.processor == null) {
+        throw new QueryIndexException("Index hasn't insert any data");
+      }
+      // we don't add lock to index processor here. It doesn't matter.
+      return struct;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void endQuery(PartialPath indexSeries, IndexType indexType, QueryContext context) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean addIndexIntoRouter(
+      PartialPath partialPath,
+      IndexInfo indexInfo,
+      CreateIndexProcessorFunc func,
+      boolean doSerialize)
+      throws MetadataException {
+    if (unmodifiable) {
+      throw new MetadataException("cannot add index to unmodifiable router");
+    }
+    partialPath.toLowerCase();
+    // only the pair.left (indexType map) will be updated.
+    lock.writeLock().lock();
+    IndexType indexType = indexInfo.getIndexType();
+    // record the relationship between storage group and the
+    StorageGroupMNode storageGroupMNode = mManager.getStorageGroupNodeByPath(partialPath);
+    String storageGroupPath = storageGroupMNode.getPartialPath().getFullPath();
+    // add to pathMap
+    try {
+      if (partialPath.isFullPath()) {
+        String fullPath = partialPath.getFullPath();
+        if (!fullPathProcessorMap.containsKey(fullPath)) {
+          Map<IndexType, IndexInfo> infoMap = new EnumMap<>(IndexType.class);
+          infoMap.put(indexType, indexInfo);
+          IndexProcessor processor = func.act(partialPath, infoMap);
+          fullPathProcessorMap.put(
+              fullPath, new IndexProcessorStruct(processor, partialPath, infoMap));
+        } else {
+          fullPathProcessorMap.get(fullPath).infos.put(indexType, indexInfo);
+        }
+        IndexProcessorStruct pair = fullPathProcessorMap.get(fullPath);
+        pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+
+        // add to sg
+        Set<String> indexSeriesSet = new HashSet<>();
+        Set<String> preSet = sgToFullPathMap.putIfAbsent(storageGroupPath, indexSeriesSet);
+        if (preSet != null) {
+          indexSeriesSet = preSet;
+        }
+        indexSeriesSet.add(fullPath);
+      } else {
+        if (!wildCardProcessorMap.containsKey(partialPath)) {
+          Map<IndexType, IndexInfo> infoMap = new EnumMap<>(IndexType.class);
+          infoMap.put(indexType, indexInfo);
+          IndexProcessor processor = func.act(partialPath, infoMap);
+          PartialPath representativePath = getRepresentativePath(partialPath);
+          wildCardProcessorMap.put(
+              partialPath, new IndexProcessorStruct(processor, representativePath, infoMap));
+        } else {
+          wildCardProcessorMap.get(partialPath).infos.put(indexType, indexInfo);
+        }
+        IndexProcessorStruct pair = wildCardProcessorMap.get(partialPath);
+        pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+
+        // add to sg
+        Set<PartialPath> indexSeriesSet = new HashSet<>();
+        Set<PartialPath> preSet = sgToWildCardPathMap.putIfAbsent(storageGroupPath, indexSeriesSet);
+        if (preSet != null) {
+          indexSeriesSet = preSet;
+        }
+        indexSeriesSet.add(partialPath);
+        if (doSerialize) {
+          serialize(false);
+        }
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return true;
+  }
+
+  private PartialPath getRepresentativePath(PartialPath wildcardPath) throws MetadataException {
+    Pair<List<PartialPath>, Integer> paths =
+        mManager.getAllTimeseriesPathWithAlias(wildcardPath, 1, 0);
+    if (paths.left.isEmpty()) {
+      throw new MetadataException("Please create at least one series before create index");
+    } else {
+      return paths.left.get(0);
+    }
+  }
+
+  @Override
+  public boolean removeIndexFromRouter(PartialPath partialPath, IndexType indexType)
+      throws MetadataException, IOException {
+    if (unmodifiable) {
+      throw new MetadataException("cannot remove index from unmodifiable router");
+    }
+    partialPath.toLowerCase();
+    // only the pair.left (indexType map) will be updated.
+    lock.writeLock().lock();
+    // record the relationship between storage group and the index processors
+    StorageGroupMNode storageGroupMNode = mManager.getStorageGroupNodeByPath(partialPath);
+    String storageGroupPath = storageGroupMNode.getPartialPath().getFullPath();
+
+    // remove from pathMap
+    try {
+      if (partialPath.isFullPath()) {
+        String fullPath = partialPath.getFullPath();
+        if (fullPathProcessorMap.containsKey(fullPath)) {
+          IndexProcessorStruct pair = fullPathProcessorMap.get(fullPath);
+          pair.infos.remove(indexType);
+          pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+          if (pair.infos.isEmpty()) {
+            sgToFullPathMap.get(storageGroupPath).remove(fullPath);
+            fullPathProcessorMap.remove(fullPath);
+            pair.representativePath = null;
+            pair.processor.close(true);
+            pair.processor = null;
+          }
+        }
+      } else {
+        if (wildCardProcessorMap.containsKey(partialPath)) {
+          IndexProcessorStruct pair = wildCardProcessorMap.get(partialPath);
+          pair.infos.remove(indexType);
+          pair.processor.refreshSeriesIndexMapFromMManager(pair.infos);
+          if (pair.infos.isEmpty()) {
+            sgToWildCardPathMap.get(storageGroupPath).remove(partialPath);
+            wildCardProcessorMap.remove(partialPath);
+            pair.representativePath = null;
+            pair.processor.close(true);
+            pair.processor = null;
+          }
+        }
+      }
+      serialize(false);
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return true;
+  }
+
+  @Override
+  public Map<IndexType, IndexInfo> getIndexInfosByIndexSeries(PartialPath indexSeries) {
+    lock.readLock().lock();
+    try {
+      if (wildCardProcessorMap.containsKey(indexSeries)) {
+        return Collections.unmodifiableMap(wildCardProcessorMap.get(indexSeries).infos);
+      }
+      if (fullPathProcessorMap.containsKey(indexSeries.getFullPath())) {
+        return Collections.unmodifiableMap(
+            fullPathProcessorMap.get(indexSeries.getFullPath()).infos);
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+    return new HashMap<>();
+  }
+
+  @Override
+  public Iterable<IndexProcessorStruct> getAllIndexProcessorsAndInfo() {
+    lock.readLock().lock();
+    List<IndexProcessorStruct> res =
+        new ArrayList<>(wildCardProcessorMap.size() + fullPathProcessorMap.size());
+    try {
+      wildCardProcessorMap.forEach((k, v) -> res.add(v));
+      fullPathProcessorMap.forEach((k, v) -> res.add(v));
+    } finally {
+      lock.readLock().unlock();
+    }
+    return res;
+  }
+
+  @Override
+  public Iterable<IndexProcessor> getIndexProcessorByPath(PartialPath timeSeries) {
+    lock.readLock().lock();
+    List<IndexProcessor> res = new ArrayList<>();
+    try {
+      if (fullPathProcessorMap.containsKey(timeSeries.getFullPath())) {
+        res.add(fullPathProcessorMap.get(timeSeries.getFullPath()).processor);
+      } else {
+        wildCardProcessorMap.forEach(
+            (k, v) -> {
+              if (k.matchFullPath(timeSeries)) {
+                res.add(v.processor);
+              }
+            });
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+    return res;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    Iterable<IndexProcessorStruct> all = getAllIndexProcessorsAndInfo();
+    for (IndexProcessorStruct struct : all) {
+      sb.append(struct.toString()).append("\n");
+    }
+    return sb.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java b/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java
new file mode 100644
index 0000000..fe42852
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/IIndexUsable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The data which has flushed out may be updated due to the "unsequence data" or deletion. As we
+ * cannot assume that all index techniques are able to support data deletion or update, the index
+ * framework introduces the concept of "index usability range". In the time range which is marked as
+ * "index unusable", the correctness of index's pruning phase is not guaranteed.
+ *
+ * <p>A natural solution is to put the data in the "index unusable" range into the post-processing
+ * phase (or called refinement phase) directly.
+ *
+ * <p>TODO The IIndexUsable's update due to the "merge" finishing hasn't been taken in account.
+ */
+public interface IIndexUsable {
+
+  /**
+   * add a range where index is usable.
+   *
+   * @param fullPath the path of time series
+   * @param start start timestamp
+   * @param end end timestamp
+   */
+  void addUsableRange(PartialPath fullPath, long start, long end);
+
+  /**
+   * minus a range where index is usable.
+   *
+   * @param fullPath the path of time series
+   * @param start start timestamp
+   * @param end end timestamp
+   */
+  void minusUsableRange(PartialPath fullPath, long start, long end);
+
+  /**
+   * The result format depends on "sub-matching" ({@linkplain SubMatchIndexUsability}) or
+   * "whole-matching" ({@linkplain WholeMatchIndexUsability})
+   *
+   * @return the range where index is unusable.
+   */
+  Object getUnusableRange();
+
+  void serialize(OutputStream outputStream) throws IOException;
+
+  void deserialize(InputStream inputStream) throws IllegalPathException, IOException;
+
+  class Factory {
+
+    private Factory() {
+      // hidden initializer
+    }
+
+    public static IIndexUsable createEmptyIndexUsability(PartialPath path) {
+      if (path.isFullPath()) {
+        return new SubMatchIndexUsability();
+      } else {
+        return new WholeMatchIndexUsability();
+      }
+    }
+
+    public static IIndexUsable deserializeIndexUsability(PartialPath path, InputStream inputStream)
+        throws IOException, IllegalPathException {
+      IIndexUsable res;
+      if (path.isFullPath()) {
+        res = new SubMatchIndexUsability();
+        res.deserialize(inputStream);
+      } else {
+        res = new WholeMatchIndexUsability();
+        res.deserialize(inputStream);
+      }
+      return res;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java b/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java
new file mode 100644
index 0000000..6ae41d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsability.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is to record the index usable range for a single long series, which corresponds to the
+ * subsequence matching scenario.
+ *
+ * <p>The series path in the initialized parameters must be a full path (without wildcard
+ * characters).
+ *
+ * <p>It's not thread-safe.
+ */
+public class SubMatchIndexUsability implements IIndexUsable {
+
+  private int maxSizeOfUsableSegments;
+  private final boolean initAllUsable;
+  private RangeNode unusableRanges;
+  private int size;
+
+  SubMatchIndexUsability() {
+    this(IoTDBDescriptor.getInstance().getConfig().getDefaultMaxSizeOfUnusableSegments(), true);
+  }
+
+  /**
+   * Construction
+   *
+   * @param maxSizeOfUsableSegments the max size of usable segments
+   * @param initAllUsable true if the entire time range is set to "usable".
+   */
+  SubMatchIndexUsability(int maxSizeOfUsableSegments, boolean initAllUsable) {
+    this.maxSizeOfUsableSegments = maxSizeOfUsableSegments;
+    this.initAllUsable = initAllUsable;
+    unusableRanges = new RangeNode(Long.MIN_VALUE, Long.MAX_VALUE, null);
+    size = 1;
+    if (initAllUsable) {
+      addUsableRange(null, 0, Long.MAX_VALUE - 1);
+    }
+  }
+
+  @Override
+  public void addUsableRange(PartialPath fullPath, long startTime, long endTime) {
+    // simplify the problem
+    if (startTime == Long.MIN_VALUE) {
+      startTime = startTime + 1;
+    }
+    if (endTime == Long.MAX_VALUE) {
+      endTime = endTime - 1;
+    }
+    RangeNode node = locateIdxByTime(startTime);
+    RangeNode prevNode = node;
+    while (node != null && node.start <= endTime) {
+      assert node.start <= node.end;
+      assert startTime <= endTime;
+      if (node.end < startTime) {
+        prevNode = node;
+        node = node.next;
+      } else if (startTime <= node.start && node.end <= endTime) {
+        // the unusable range is covered.
+        prevNode.next = node.next;
+        node = node.next;
+        size--;
+      } else if (node.start <= startTime && endTime <= node.end) {
+        if (node.start == startTime) {
+          // left aligned
+          node.start = endTime + 1;
+          prevNode = node;
+          node = node.next;
+        } else if (node.end == endTime) {
+          // right aligned
+          node.end = startTime - 1;
+          prevNode = node;
+          node = node.next;
+        }
+        // the unusable range is split. If it reaches the upper bound, not split it
+        else if (size < maxSizeOfUsableSegments) {
+          RangeNode newNode = new RangeNode(endTime + 1, node.end, node.next);
+          node.end = startTime - 1;
+          newNode.next = node.next;
+          node.next = newNode;
+          prevNode = newNode;
+          node = newNode.next;
+          size++;
+        } else {
+          // don't split, thus do nothing.
+          prevNode = node;
+          node = node.next;
+        }
+      } else if (startTime < node.start) {
+        node.start = endTime + 1;
+        prevNode = node;
+        node = node.next;
+      } else {
+        node.end = startTime - 1;
+        prevNode = node;
+        node = node.next;
+      }
+    }
+  }
+
+  @Override
+  public void minusUsableRange(PartialPath fullPath, long startTime, long endTime) {
+    // simplify the problem
+    if (startTime == Long.MIN_VALUE) {
+      startTime = startTime + 1;
+    }
+    if (endTime == Long.MAX_VALUE) {
+      endTime = endTime - 1;
+    }
+    RangeNode node = locateIdxByTime(startTime);
+    if (endTime <= node.end) {
+      return;
+    }
+    // add the unusable range into the list
+    RangeNode newNode;
+    if (startTime <= node.end + 1) {
+      // overlapping this node
+      node.end = endTime;
+      newNode = node;
+      node = newNode.next;
+    } else if (node.next.start <= endTime + 1) {
+      newNode = node.next;
+      if (startTime < newNode.start) {
+        newNode.start = startTime;
+      }
+      if (newNode.end < endTime) {
+        newNode.end = endTime;
+      }
+      node = newNode.next;
+    } else {
+      // non-overlap with former and latter nodes
+      if (size < maxSizeOfUsableSegments) {
+        // it doesn't overlap with latter node, new node is inserted
+        newNode = new RangeNode(startTime, endTime, node.next);
+        node.next = newNode;
+        size++;
+      } else {
+        // we cannot add more range, merge it with closer neighbor.
+        if (startTime - node.end < node.next.start - endTime) {
+          node.end = endTime;
+        } else {
+          node.next.start = startTime;
+        }
+      }
+      return;
+    }
+
+    // merge the overlapped list
+    while (node != null && node.start <= endTime + 1) {
+      if (newNode.end < node.end) {
+        newNode.end = node.end;
+      }
+      // delete the redundant node
+      newNode.next = node.next;
+      node = node.next;
+      size--;
+    }
+  }
+
+  @Override
+  public List<Filter> getUnusableRange() {
+    List<Filter> res = new ArrayList<>();
+    RangeNode p = this.unusableRanges;
+    while (p != null) {
+      res.add(toFilter(p.start, p.end));
+      p = p.next;
+    }
+    return res;
+  }
+
+  /**
+   * Find the latest node whose start time is less than {@code timestamp} A naive scanning search
+   * for the linked list. Further optimization is still going on. It returns a node and there is
+   * {@code node.start <= timestamp}
+   */
+  private RangeNode locateIdxByTime(long timestamp) {
+    if (unusableRanges.next == null) {
+      return unusableRanges;
+    }
+    RangeNode res = null;
+    RangeNode node = unusableRanges;
+    while (node.next != null) {
+      if (node.start >= timestamp) {
+        break;
+      }
+      res = node;
+      node = node.next;
+    }
+    return res;
+  }
+
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(size, outputStream);
+    ReadWriteIOUtils.write(maxSizeOfUsableSegments, outputStream);
+    RangeNode.serialize(unusableRanges, outputStream);
+  }
+
+  @Override
+  public void deserialize(InputStream inputStream) throws IOException {
+    size = ReadWriteIOUtils.readInt(inputStream);
+    maxSizeOfUsableSegments = ReadWriteIOUtils.readInt(inputStream);
+    unusableRanges = RangeNode.deserialize(inputStream);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(String.format("size:%d,", size));
+    RangeNode node = unusableRanges;
+    while (node != null) {
+      sb.append(node.toString());
+      node = node.next;
+    }
+    return sb.toString();
+  }
+
+  private static Filter toFilter(long startTime, long endTime) {
+    return FilterFactory.and(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime));
+  }
+
+  public boolean hasUnusableRange() {
+    if (initAllUsable) {
+      return !(unusableRanges.end == 0
+          && unusableRanges.next != null
+          && unusableRanges.next.start == Long.MAX_VALUE - 1
+          && unusableRanges.next.next == null);
+    } else {
+      return size > 1;
+    }
+  }
+
+  private static class RangeNode {
+
+    long start;
+    long end;
+    RangeNode next;
+
+    RangeNode(long start, long end, RangeNode next) {
+      this.start = start;
+      this.end = end;
+      this.next = next;
+    }
+
+    @Override
+    public String toString() {
+      return "["
+          + (start == Long.MIN_VALUE ? "MIN" : start)
+          + ","
+          + (end == Long.MAX_VALUE ? "MAX" : end)
+          + "],";
+    }
+
+    public static void serialize(RangeNode head, OutputStream outputStream) throws IOException {
+      int listLength = 0;
+      RangeNode tmp = head;
+      while (tmp != null) {
+        listLength++;
+        tmp = tmp.next;
+      }
+      ReadWriteIOUtils.write(listLength, outputStream);
+      while (head != null) {
+        ReadWriteIOUtils.write(head.start, outputStream);
+        ReadWriteIOUtils.write(head.end, outputStream);
+        head = head.next;
+      }
+    }
+
+    public static RangeNode deserialize(InputStream inputStream) throws IOException {
+      int listLength = ReadWriteIOUtils.readInt(inputStream);
+      RangeNode fakeHead = new RangeNode(-1, -1, null);
+      RangeNode node = fakeHead;
+      for (int i = 0; i < listLength; i++) {
+        long start = ReadWriteIOUtils.readLong(inputStream);
+        long end = ReadWriteIOUtils.readLong(inputStream);
+        RangeNode nextNode = new RangeNode(start, end, null);
+        node.next = nextNode;
+        node = nextNode;
+      }
+      return fakeHead.next;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java b/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java
new file mode 100644
index 0000000..d35bb81
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsability.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class is to record the index usable range for a list of short series, which corresponds to
+ * the whole matching scenario.
+ *
+ * <p>The series path involves wildcard characters. One series is marked as "index-usable" or
+ * "index-unusable".
+ *
+ * <p>It's not thread-safe.
+ */
+public class WholeMatchIndexUsability implements IIndexUsable {
+
+  private final Set<PartialPath> unusableSeriesSet;
+
+  WholeMatchIndexUsability() {
+    this.unusableSeriesSet = new HashSet<>();
+  }
+
+  @Override
+  public void addUsableRange(PartialPath fullPath, long start, long end) {
+    // do nothing temporarily
+  }
+
+  @Override
+  public void minusUsableRange(PartialPath fullPath, long start, long end) {
+    unusableSeriesSet.add(fullPath);
+  }
+
+  @Override
+  public Set<PartialPath> getUnusableRange() {
+    return Collections.unmodifiableSet(this.unusableSeriesSet);
+  }
+
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(unusableSeriesSet.size(), outputStream);
+    for (PartialPath s : unusableSeriesSet) {
+      ReadWriteIOUtils.write(s.getFullPath(), outputStream);
+    }
+  }
+
+  @Override
+  public void deserialize(InputStream inputStream) throws IllegalPathException, IOException {
+    int size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; i++) {
+      unusableSeriesSet.add(new PartialPath(ReadWriteIOUtils.readString(inputStream)));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
index 1f006d9..fb19c9a 100755
--- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
@@ -156,6 +156,15 @@ public class PartialPath extends Path implements Comparable<Path> {
     return true;
   }
 
+  public boolean isFullPath() {
+    for (String node : nodes) {
+      if (node.equals(IoTDBConstant.PATH_WILDCARD)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public String getFullPath() {
     if (fullPath != null) {
@@ -312,4 +321,13 @@ public class PartialPath extends Path implements Comparable<Path> {
     }
     return ret;
   }
+
+  public void toLowerCase() {
+    for (int i = 0; i < nodes.length; i++) {
+      nodes[i] = nodes[i].toLowerCase();
+    }
+    fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
+    if (measurementAlias != null) measurementAlias = measurementAlias.toLowerCase();
+    if (tsAlias != null) tsAlias = tsAlias.toLowerCase();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 54a0bf7..db9d416 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -39,11 +39,15 @@ import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.QueryIdNotExsitException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.index.IndexManagerException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.IndexType;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -76,6 +80,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CountPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
@@ -83,6 +88,7 @@ import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
@@ -193,6 +199,7 @@ public class PlanExecutor implements IPlanExecutor {
   protected IQueryRouter queryRouter;
   // for administration
   private IAuthorizer authorizer;
+  private IndexManager indexManager;
 
   private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
 
@@ -200,6 +207,7 @@ public class PlanExecutor implements IPlanExecutor {
     queryRouter = new QueryRouter();
     try {
       authorizer = BasicAuthorizer.getInstance();
+      indexManager = IndexManager.getInstance();
     } catch (AuthException e) {
       throw new QueryProcessException(e.getMessage());
     }
@@ -325,9 +333,9 @@ public class PlanExecutor implements IPlanExecutor {
       case STOP_TRIGGER:
         return operateStopTrigger((StopTriggerPlan) plan);
       case CREATE_INDEX:
-        throw new QueryProcessException("Create index hasn't been supported yet");
+        return createIndex((CreateIndexPlan) plan);
       case DROP_INDEX:
-        throw new QueryProcessException("Drop index hasn't been supported yet");
+        return dropIndex((DropIndexPlan) plan);
       case KILL:
         try {
           operateKillQuery((KillQueryPlan) plan);
@@ -479,7 +487,7 @@ public class PlanExecutor implements IPlanExecutor {
         GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
         queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
       } else if (queryPlan instanceof QueryIndexPlan) {
-        throw new QueryProcessException("Query index hasn't been supported yet");
+        queryDataSet = queryRouter.indexQuery((QueryIndexPlan) queryPlan, context);
       } else if (queryPlan instanceof AggregationPlan) {
         AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
         queryDataSet = queryRouter.aggregate(aggregationPlan, context);
@@ -1750,4 +1758,33 @@ public class PlanExecutor implements IPlanExecutor {
     }
     return noExistSg;
   }
+
+  private boolean createIndex(CreateIndexPlan createIndexPlan) throws QueryProcessException {
+    List<PartialPath> paths = createIndexPlan.getPaths();
+    List<PartialPath> partialPaths = new ArrayList<>(paths);
+    long startTime = createIndexPlan.getTime();
+    IndexType indexType = createIndexPlan.getIndexType();
+    Map<String, String> props = createIndexPlan.getProps();
+    IndexInfo indexInfo = new IndexInfo(indexType, startTime, props);
+    try {
+      IndexManager.getInstance().createIndex(partialPaths, indexInfo);
+    } catch (MetadataException e) {
+      throw new IndexManagerException(e);
+    }
+    return true;
+  }
+
+  private boolean dropIndex(DropIndexPlan dropIndexPlan) throws QueryProcessException {
+    List<PartialPath> paths = dropIndexPlan.getPaths();
+    List<PartialPath> partialPaths = new ArrayList<>(paths);
+    IndexType indexType = dropIndexPlan.getIndexType();
+    try {
+      IndexManager.getInstance().dropIndex(partialPaths, indexType);
+    } catch (MetadataException e) {
+      throw new IndexManagerException(e);
+    } catch (IOException e2) {
+      throw new IndexManagerException(e2.getMessage());
+    }
+    return true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 27d20f1..9410c99 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -28,6 +28,7 @@ public class QueryIndexPlan extends RawDataQueryPlan {
 
   private Map<String, Object> props;
   private IndexType indexType;
+  private boolean alignedByTime = false;
 
   public QueryIndexPlan() {
     super();
@@ -50,6 +51,14 @@ public class QueryIndexPlan extends RawDataQueryPlan {
     this.props = props;
   }
 
+  public boolean isAlignedByTime() {
+    return alignedByTime;
+  }
+
+  public void setAlignedByTime(boolean alignedByTime) {
+    this.alignedByTime = alignedByTime;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
index aef7399..989aa10 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java
@@ -19,12 +19,14 @@
 package org.apache.iotdb.db.query.executor;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -65,4 +67,14 @@ public interface IQueryRouter {
   /** Execute UDTF query */
   QueryDataSet udtfQuery(UDTFPlan udtfPlan, QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException, InterruptedException;
+
+  /**
+   * Query executor
+   *
+   * @param queryPlan
+   * @param context
+   * @return
+   */
+  QueryDataSet indexQuery(QueryIndexPlan queryPlan, QueryContext context)
+      throws QueryIndexException, StorageEngineException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 3478c6c..fd67530 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -20,13 +20,16 @@
 package org.apache.iotdb.db.query.executor;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.QueryIndexException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.read.QueryIndexExecutor;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -128,7 +131,7 @@ public class QueryRouter implements IQueryRouter {
 
     AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan);
 
-    QueryDataSet dataSet = null;
+    QueryDataSet dataSet;
 
     if (optimizedExpression != null
         && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
@@ -252,6 +255,13 @@ public class QueryRouter implements IQueryRouter {
     return lastQueryExecutor.execute(context, lastQueryPlan);
   }
 
+  @Override
+  public QueryDataSet indexQuery(QueryIndexPlan queryPlan, QueryContext context)
+      throws QueryIndexException, StorageEngineException {
+    QueryIndexExecutor executor = new QueryIndexExecutor(queryPlan, context);
+    return executor.executeIndexQuery();
+  }
+
   protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
     return new LastQueryExecutor(lastQueryPlan);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index c2368c4..7332945 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.index.IndexManager;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.query.control.TracingManager;
@@ -116,6 +117,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(TemporaryQueryDataFileService.getInstance());
     registerManager.register(UDFClassLoaderManager.getInstance());
     registerManager.register(UDFRegistrationService.getInstance());
+    registerManager.register(IndexManager.getInstance());
 
     registerManager.register(RPCService.getInstance());
     if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index b474b2c..0704e48 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -50,7 +50,10 @@ public enum ServiceType {
 
   FLUSH_SERVICE(
       "Flush ServerService", generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")),
-  CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor");
+  CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor"),
+
+  INDEX_SERVICE(
+      "Index ServerService", generateJmxName("org.apache.iotdb.db.index", "Index Manager"));
 
   private final String name;
   private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 952a86c..28142d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.index.read.IndexQueryDataSet;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
 import org.apache.iotdb.db.qp.Planner;
@@ -58,6 +59,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
@@ -667,6 +669,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp.setIgnoreTimeStamp(false);
       }
 
+      if (plan instanceof QueryIndexPlan) {
+        resp.setColumns(
+            newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
+        resp.setDataTypeList(
+            newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+        resp.setColumnNameIndexMap(((IndexQueryDataSet) newDataSet).getPathToIndex());
+      }
       if (newDataSet instanceof DirectNonAlignDataSet) {
         resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
       } else {
@@ -847,6 +856,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
                       .getOutputDataType());
         }
         break;
+      case QUERY_INDEX:
+        // For query index, we don't know the columns before actual query.
+        // It will be deferred after obtaining query result set.
+        break;
       default:
         throw new TException("unsupported query type: " + plan.getOperatorType());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..82a521c 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
 
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -571,4 +572,32 @@ public abstract class TVList {
   public long getLastTime() {
     return getTime(size - 1);
   }
+
+  @TestOnly
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    for (int i = 0; i < size; i++) {
+      TimeValuePair pair = getTimeValuePair(i);
+      switch (getDataType()) {
+        case INT32:
+          sb.append(String.format("[%d,%d],", pair.getTimestamp(), pair.getValue().getInt()));
+          break;
+        case INT64:
+          sb.append(String.format("[%d,%d],", pair.getTimestamp(), pair.getValue().getLong()));
+          break;
+        case FLOAT:
+          sb.append(String.format("[%d,%.2f],", pair.getTimestamp(), pair.getValue().getFloat()));
+          break;
+        case DOUBLE:
+          sb.append(String.format("[%d,%.2f],", pair.getTimestamp(), pair.getValue().getDouble()));
+          break;
+        default:
+          throw new NotImplementedException(getDataType().toString());
+      }
+    }
+    sb.append("}");
+    return sb.toString();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index cea833d..6f545b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -221,7 +221,8 @@ public class TsFileRecoverPerformer {
             new MemTableFlushTask(
                 recoverMemTable,
                 restorableTsFileIOWriter,
-                tsFileResource.getTsFile().getParentFile().getParentFile().getName());
+                tsFileResource.getTsFile().getParentFile().getParentFile().getName(),
+                sequence);
         tableFlushTask.syncFlushMemTable();
         tsFileResource.updatePlanIndexes(recoverMemTable.getMinPlanIndex());
         tsFileResource.updatePlanIndexes(recoverMemTable.getMaxPlanIndex());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index a1b5d55..0576236 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -69,7 +69,8 @@ public class MemTableFlushTaskTest {
         MemTableTestUtils.deviceId0,
         MemTableTestUtils.measurementId0,
         MemTableTestUtils.dataType0);
-    MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
+    MemTableFlushTask memTableFlushTask =
+        new MemTableFlushTask(memTable, writer, storageGroup, true);
     assertTrue(
         writer
             .getVisibleMetadataList(
diff --git a/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java b/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java
new file mode 100644
index 0000000..a9fe55f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/it/NoIndexWriteIT.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.it;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.common.math.Randomwalk;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.index.common.IndexType.NO_INDEX;
+import static org.junit.Assert.fail;
+
+public class NoIndexWriteIT {
+
+  private static final String insertPattern = "INSERT INTO %s(timestamp, %s) VALUES (%d, %.3f)";
+
+  private static final String storageGroupSub = "root.wind1";
+  private static final String storageGroupWhole = "root.wind2";
+
+  private static final String speed1 = "root.wind1.azq01.speed";
+  private static final String speed1Device = "root.wind1.azq01";
+  private static final String speed1Sensor = "speed";
+
+  private static final String directionDevicePattern = "root.wind2.%d";
+  private static final String directionPattern = "root.wind2.%d.direction";
+  private static final String directionSensor = "direction";
+
+  private static final String indexSub = speed1;
+  private static final String indexWhole = "root.wind2.*.direction";
+  private static final int wholeSize = 5;
+  private static final int wholeDim = 100;
+  private static final int subLength = 10_000;
+
+  @Before
+  public void setUp() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableIndex(true);
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance().getConfig().setEnableIndex(false);
+  }
+
+  @Test
+  public void checkWrite() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    //    IoTDBDescriptor.getInstance().getConfig().setEnableIndex(false);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement(); ) {
+      long start = System.currentTimeMillis();
+      statement.execute(String.format("SET STORAGE GROUP TO %s", storageGroupSub));
+      statement.execute(String.format("SET STORAGE GROUP TO %s", storageGroupWhole));
+
+      System.out.println(
+          String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
+      statement.execute(
+          String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
+
+      for (int i = 0; i < wholeSize; i++) {
+        String wholePath = String.format(directionPattern, i);
+        statement.execute(
+            String.format("CREATE TIMESERIES %s WITH DATATYPE=FLOAT,ENCODING=PLAIN", wholePath));
+      }
+      long startCreateIndex = System.currentTimeMillis();
+      statement.execute(String.format("CREATE INDEX ON %s WITH INDEX=%s", indexSub, NO_INDEX));
+      statement.execute(String.format("CREATE INDEX ON %s WITH INDEX=%s", indexWhole, NO_INDEX));
+
+      System.out.println(IndexManager.getInstance().getRouter());
+      Assert.assertEquals(
+          "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+              + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+          IndexManager.getInstance().getRouter().toString());
+      TVList subInput = Randomwalk.generateRanWalkTVList(subLength);
+      long startInsertSub = System.currentTimeMillis();
+      for (int i = 0; i < subInput.size(); i++) {
+        statement.execute(
+            String.format(
+                insertPattern,
+                speed1Device,
+                speed1Sensor,
+                subInput.getTime(i),
+                subInput.getDouble(i)));
+      }
+      statement.execute("flush");
+      System.out.println("insert finish for subsequence case");
+      System.out.println(IndexManager.getInstance().getRouter());
+      Assert.assertEquals(
+          "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+              + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+          IndexManager.getInstance().getRouter().toString());
+
+      long startInsertWhole = System.currentTimeMillis();
+      TVList wholeInput = Randomwalk.generateRanWalkTVList(wholeDim * wholeSize);
+      for (int i = 0; i < wholeSize; i++) {
+        String device = String.format(directionDevicePattern, i);
+        statement.execute(
+            String.format(
+                insertPattern,
+                device,
+                directionSensor,
+                wholeInput.getTime(i),
+                wholeInput.getDouble(i)));
+      }
+      statement.execute("flush");
+      long end = System.currentTimeMillis();
+      System.out.println(IndexManager.getInstance().getRouter());
+      Assert.assertEquals(
+          "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+              + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {}]}\n"
+              + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+          IndexManager.getInstance().getRouter().toString());
+      System.out.println("insert finish for subsequence case");
+      System.out.println(String.format("create series use %d ms", (startCreateIndex - start)));
+      System.out.println(
+          String.format("create index  use %d ms", (startInsertSub - startCreateIndex)));
+      System.out.println(
+          String.format("insert sub    use %d ms", (startInsertWhole - startInsertSub)));
+      System.out.println(String.format("insert whole  use %d ms", (end - startInsertWhole)));
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java b/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java
new file mode 100644
index 0000000..69af9e9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/router/ProtoIndexRouterTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.router;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.index.IndexProcessor;
+import org.apache.iotdb.db.index.common.IndexInfo;
+import org.apache.iotdb.db.index.common.func.CreateIndexProcessorFunc;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_SLIDE_STEP;
+import static org.apache.iotdb.db.index.common.IndexConstant.INDEX_WINDOW_RANGE;
+import static org.apache.iotdb.db.index.common.IndexConstant.PAA_DIM;
+import static org.apache.iotdb.db.index.common.IndexType.NO_INDEX;
+
+public class ProtoIndexRouterTest {
+
+  private static final String storageGroupSub = "root.wind1";
+  private static final String storageGroupFull = "root.wind2";
+  private static final String speed1 = "root.wind1.azq01.speed";
+  private static final String direction1 = "root.wind2.1.direction";
+  private static final String direction2 = "root.wind2.2.direction";
+  private static final String direction3 = "root.wind2.3.direction";
+  private static final String index_sub = speed1;
+  private static final String index_full = "root.wind2.*.direction";
+  private IndexInfo infoFull;
+  private IndexInfo infoSub;
+  private CreateIndexProcessorFunc fakeCreateFunc;
+
+  private void prepareMManager() throws MetadataException {
+    MManager mManager = MManager.getInstance();
+    mManager.init();
+    mManager.setStorageGroup(new PartialPath(storageGroupSub));
+    mManager.setStorageGroup(new PartialPath(storageGroupFull));
+    mManager.createTimeseries(
+        new PartialPath(speed1),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    mManager.createTimeseries(
+        new PartialPath(direction1),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    mManager.createTimeseries(
+        new PartialPath(direction2),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    mManager.createTimeseries(
+        new PartialPath(direction3),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    Map<String, String> props_sub = new HashMap<>();
+    props_sub.put(INDEX_WINDOW_RANGE, "4");
+    props_sub.put(INDEX_SLIDE_STEP, "8");
+
+    Map<String, String> props_full = new HashMap<>();
+    props_full.put(PAA_DIM, "5");
+    this.infoSub = new IndexInfo(NO_INDEX, 0, props_sub);
+    this.infoFull = new IndexInfo(NO_INDEX, 5, props_full);
+    this.fakeCreateFunc =
+        (indexSeries, indexInfoMap) ->
+            new IndexProcessor(
+                indexSeries, testRouterDir + File.separator + "index_fake_" + indexSeries);
+  }
+
+  private static final String testRouterDir = "test_protoIndexRouter";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    if (!new File(testRouterDir).exists()) {
+      new File(testRouterDir).mkdirs();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    if (new File(testRouterDir).exists()) {
+      FileUtils.deleteDirectory(new File(testRouterDir));
+    }
+  }
+
+  @Test
+  public void testBasic() throws MetadataException, IOException {
+    prepareMManager();
+    ProtoIndexRouter router = new ProtoIndexRouter(testRouterDir);
+
+    router.addIndexIntoRouter(new PartialPath(index_sub), infoSub, fakeCreateFunc, true);
+    router.addIndexIntoRouter(new PartialPath(index_full), infoFull, fakeCreateFunc, true);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+            + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        router.toString());
+    // select by storage group
+    ProtoIndexRouter sgRouters =
+        (ProtoIndexRouter) router.getRouterByStorageGroup(storageGroupFull);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n",
+        sgRouters.toString());
+
+    // serialize
+    router.serialize(false);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+            + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        router.toString());
+    router.serialize(true);
+    Assert.assertEquals("", router.toString());
+
+    IIndexRouter newRouter = new ProtoIndexRouter(testRouterDir);
+    newRouter.deserializeAndReload(fakeCreateFunc);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 5, props: {PAA_DIM=5}]}\n"
+            + "root.wind2.*.direction: {NO_INDEX=NO_INDEX}>\n"
+            + "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        newRouter.toString());
+
+    // delete index
+    newRouter.removeIndexFromRouter(new PartialPath(index_full), NO_INDEX);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        newRouter.toString());
+    newRouter.removeIndexFromRouter(new PartialPath(index_full), NO_INDEX);
+    Assert.assertEquals(
+        "<{NO_INDEX=[type: NO_INDEX, time: 0, props: {INDEX_SLIDE_STEP=8, INDEX_WINDOW_RANGE=4}]}\n"
+            + "root.wind1.azq01.speed: {NO_INDEX=NO_INDEX}>\n",
+        newRouter.toString());
+
+    newRouter.removeIndexFromRouter(new PartialPath(index_sub), NO_INDEX);
+    Assert.assertEquals("", newRouter.toString());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java b/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java
new file mode 100644
index 0000000..b177a14
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/usable/SubMatchIndexUsabilityTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class SubMatchIndexUsabilityTest {
+
+  @Test
+  public void addUsableRange1() throws IOException {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+    // split range
+    usability.addUsableRange(null, 1, 9);
+    Assert.assertEquals("size:2,[MIN,0],[10,MAX],", usability.toString());
+    System.out.println(usability);
+    usability.addUsableRange(null, 21, 29);
+    Assert.assertEquals("size:3,[MIN,0],[10,20],[30,MAX],", usability.toString());
+    System.out.println(usability);
+    usability.addUsableRange(null, 41, 49);
+    Assert.assertEquals("size:4,[MIN,0],[10,20],[30,40],[50,MAX],", usability.toString());
+    System.out.println(usability);
+    // cover a range
+    usability.addUsableRange(null, 29, 45);
+    Assert.assertEquals("size:3,[MIN,0],[10,20],[50,MAX],", usability.toString());
+    System.out.println(usability);
+
+    // split a range
+    usability.addUsableRange(null, 14, 17);
+    Assert.assertEquals("size:4,[MIN,0],[10,13],[18,20],[50,MAX],", usability.toString());
+    System.out.println(usability);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+    usability2.deserialize(in);
+    Assert.assertEquals("size:4,[MIN,0],[10,13],[18,20],[50,MAX],", usability2.toString());
+  }
+
+  @Test
+  public void addUsableRange2() throws IOException {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+    usability.addUsableRange(null, 1, 19);
+    usability.addUsableRange(null, 51, 59);
+    usability.addUsableRange(null, 81, 99);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,0],[20,50],[60,80],[100,MAX],", usability.toString());
+    // left cover
+    usability.addUsableRange(null, 10, 29);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,0],[30,50],[60,80],[100,MAX],", usability.toString());
+    // right cover
+    usability.addUsableRange(null, 71, 99);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,0],[30,50],[60,70],[100,MAX],", usability.toString());
+    // left cover multiple
+    usability.addUsableRange(null, 20, 80);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,0],[100,MAX],", usability.toString());
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+    usability2.deserialize(in);
+    Assert.assertEquals("size:2,[MIN,0],[100,MAX],", usability2.toString());
+  }
+
+  @Test
+  public void minusUsableRange() throws IOException {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+
+    // merge with MIN, MAX
+    usability.minusUsableRange(null, 1, 19);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+
+    // new range is covered by the first range
+    usability.addUsableRange(null, 51, 89);
+    usability.minusUsableRange(null, 1, 19);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,50],[90,MAX],", usability.toString());
+
+    usability.addUsableRange(null, 101, 259);
+    // new range extend node's end time
+    usability.minusUsableRange(null, 51, 60);
+    System.out.println(usability);
+    Assert.assertEquals("size:3,[MIN,60],[90,100],[260,MAX],", usability.toString());
+
+    // new range extend node's start time
+    usability.minusUsableRange(null, 80, 89);
+    System.out.println(usability);
+    Assert.assertEquals("size:3,[MIN,60],[80,100],[260,MAX],", usability.toString());
+
+    // new range is inserted as a individual node [120, 140]
+    usability.minusUsableRange(null, 120, 140);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[120,140],[260,MAX],", usability.toString());
+
+    // re-insert: new range is totally same as an exist node
+    usability.minusUsableRange(null, 120, 140);
+    System.out.println(usability);
+    Assert.assertTrue(usability.hasUnusableRange());
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[120,140],[260,MAX],", usability.toString());
+
+    // re-insert: new range extend the both sides of an exist node
+    usability.minusUsableRange(null, 110, 150);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[110,150],[260,MAX],", usability.toString());
+
+    // an isolate range but the segmentation number reaches the upper bound, thus merge the range
+    // with a closer neighbor.
+    usability.minusUsableRange(null, 200, 220);
+    System.out.println(usability);
+    Assert.assertEquals("size:4,[MIN,60],[80,100],[110,150],[200,MAX],", usability.toString());
+
+    // a range covers several node.
+    usability.minusUsableRange(null, 50, 90);
+    System.out.println(usability);
+    Assert.assertEquals("size:3,[MIN,100],[110,150],[200,MAX],", usability.toString());
+
+    // a range covers several node.
+    usability.minusUsableRange(null, 105, 200);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,100],[105,MAX],", usability.toString());
+    Assert.assertTrue(usability.hasUnusableRange());
+    // the end
+    usability.minusUsableRange(null, 101, 107);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+    Assert.assertFalse(usability.hasUnusableRange());
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    SubMatchIndexUsability usability2 = new SubMatchIndexUsability();
+    usability2.deserialize(in);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability2.toString());
+  }
+
+  @Test
+  public void subMatchingToFilter() {
+    SubMatchIndexUsability a = new SubMatchIndexUsability(10, false);
+    Assert.assertFalse(a.hasUnusableRange());
+    a.addUsableRange(null, 31, 39);
+    a.addUsableRange(null, 61, 89);
+    System.out.println(a);
+
+    Assert.assertTrue(a.hasUnusableRange());
+    List<Filter> res = a.getUnusableRange();
+    Assert.assertEquals(3, res.size());
+    System.out.println(res);
+    Assert.assertEquals(
+        "[(time >= -9223372036854775808 && time <= 30), (time >= 40 && time <= 60), (time >= 90 && time <= 9223372036854775807)]",
+        res.toString());
+  }
+
+  @Test
+  public void minusUsableRange2() {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(4, false);
+    Assert.assertFalse(usability.hasUnusableRange());
+    usability.addUsableRange(null, 0, 15);
+    usability.minusUsableRange(null, 20, 24);
+    Assert.assertEquals("size:2,[MIN,-1],[16,MAX],", usability.toString());
+    Assert.assertTrue(usability.hasUnusableRange());
+    System.out.println(usability.toString());
+  }
+
+  @Test
+  public void reachUpBoundAndStopSplit() {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(2, false);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+
+    usability.addUsableRange(null, 5, 7);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[8,MAX],", usability.toString());
+    usability.addUsableRange(null, 2, 3);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[8,MAX],", usability.toString());
+    usability.minusUsableRange(null, 6, 6);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[6,MAX],", usability.toString());
+    usability.addUsableRange(null, 6, 9);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,4],[10,MAX],", usability.toString());
+    usability.minusUsableRange(null, 6, 6);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,6],[10,MAX],", usability.toString());
+    usability.addUsableRange(null, 2, 6);
+    System.out.println(usability);
+    Assert.assertEquals("size:2,[MIN,1],[10,MAX],", usability.toString());
+  }
+
+  @Test
+  public void testBoundCase() {
+    SubMatchIndexUsability usability = new SubMatchIndexUsability(2, true);
+    usability.addUsableRange(null, Long.MIN_VALUE, Long.MAX_VALUE);
+    System.out.println(usability);
+    Assert.assertEquals(
+        "size:2,[MIN,-9223372036854775808],[9223372036854775807,MAX],", usability.toString());
+    usability.minusUsableRange(null, Long.MIN_VALUE, Long.MAX_VALUE);
+    System.out.println(usability);
+    Assert.assertEquals("size:1,[MIN,MAX],", usability.toString());
+    Assert.assertTrue(usability.hasUnusableRange());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java b/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java
new file mode 100644
index 0000000..edf49fb
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/index/usable/WholeMatchIndexUsabilityTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index.usable;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+public class WholeMatchIndexUsabilityTest {
+
+  @Test
+  public void testMinusUsableRange() throws IllegalPathException, IOException {
+    WholeMatchIndexUsability usability = new WholeMatchIndexUsability();
+    // do nothing for addUsableRange
+    usability.addUsableRange(new PartialPath("root.sg.d.s10"), 1, 2);
+    usability.addUsableRange(new PartialPath("root.sg.d.s11"), 1, 2);
+
+    usability.minusUsableRange(new PartialPath("root.sg.d.s1"), 1, 2);
+    usability.minusUsableRange(new PartialPath("root.sg.d.s2"), 1, 2);
+    usability.minusUsableRange(new PartialPath("root.sg.d.s3"), 1, 2);
+    Set<PartialPath> ret = usability.getUnusableRange();
+    Assert.assertEquals("[root.sg.d.s3, root.sg.d.s2, root.sg.d.s1]", ret.toString());
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    usability.serialize(out);
+    InputStream in = new ByteArrayInputStream(out.toByteArray());
+    WholeMatchIndexUsability usable2 = new WholeMatchIndexUsability();
+    usable2.deserialize(in);
+    Set<PartialPath> ret2 = usability.getUnusableRange();
+    Assert.assertEquals("[root.sg.d.s3, root.sg.d.s2, root.sg.d.s1]", ret2.toString());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index ab7ddf3..dd36009 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.index.IndexManager;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -121,6 +122,11 @@ public class EnvironmentUtils {
       fail();
     }
 
+    // clean index
+    if (config.isEnableIndex()) {
+      IndexManager.getInstance().deleteAll();
+    }
+
     IoTDBDescriptor.getInstance().getConfig().setReadOnly(false);
 
     // clean cache
@@ -211,6 +217,8 @@ public class EnvironmentUtils {
     cleanDir(config.getQueryDir());
     // delete tracing
     cleanDir(config.getTracingDir());
+    // delete index
+    cleanDir(config.getIndexRootFolder());
     // delete data files
     for (String dataDir : config.getDataDirs()) {
       cleanDir(dataDir);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f2fd3c1..a90dcad 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -65,6 +65,7 @@ public enum TSStatusCode {
 
   UNSUPPORTED_INDEX_FUNC_ERROR(421),
   UNSUPPORTED_INDEX_TYPE_ERROR(422),
+  INDEX_QUERY_ERROR(423),
 
   INTERNAL_SERVER_ERROR(500),
   CLOSE_OPERATION_ERROR(501),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 014d45a..02f1d16 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -128,6 +128,30 @@ public class ReadWriteIOUtils {
     }
   }
 
+  public static int write(Map<String, String> map, OutputStream stream) throws IOException {
+    int length = 0;
+    write(map.size(), stream);
+    length += 4;
+    for (Entry<String, String> entry : map.entrySet()) {
+      length += write(entry.getKey(), stream);
+      length += write(entry.getValue(), stream);
+    }
+    return length;
+  }
+
+  public static Map<String, String> readMap(InputStream inputStream) throws IOException {
+    int length = readInt(inputStream);
+    Map<String, String> map = new HashMap<>(length);
+    for (int i = 0; i < length; i++) {
+      // key
+      String key = readString(inputStream);
+      // value
+      String value = readString(inputStream);
+      map.put(key, value);
+    }
+    return map;
+  }
+
   public static int write(Map<String, String> map, ByteBuffer buffer) {
     int length = 0;
     byte[] bytes;
@@ -701,6 +725,29 @@ public class ReadWriteIOUtils {
     return bytes;
   }
 
+  /**
+   * read bytes from an inputStream where the length is specified at the head of the inputStream.
+   * Make sure {@code inputStream} contains an integer numeric (the first 4 bytes) indicating the
+   * length of the following data.
+   *
+   * @param inputStream contains a length and a stream
+   * @return bytebuffer
+   * @throws IOException if the read length doesn't equal to the self description length.
+   */
+  public static ByteBuffer readByteBufferWithSelfDescriptionLength(InputStream inputStream)
+      throws IOException {
+    int length = readInt(inputStream);
+    byte[] bytes = new byte[length];
+    int readLen = inputStream.read(bytes);
+    if (readLen != length) {
+      throw new IOException(String.format(RETURN_ERROR, length, readLen));
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(length);
+    byteBuffer.put(bytes);
+    byteBuffer.flip();
+    return byteBuffer;
+  }
+
   /** read bytes from buffer with offset position to the end of buffer. */
   public static int readAsPossible(TsFileInput input, long position, ByteBuffer buffer)
       throws IOException {