You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/06/29 13:24:22 UTC

[incubator-iotdb] 01/01: PISA demo

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pisa
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit cce4c1acf65d4372482e831873cf2dae7b88edb9
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Mon Jun 29 21:23:53 2020 +0800

    PISA demo
---
 .../org/apache/iotdb/IndexContractExample.java     |  60 ++
 .../main/java/org/apache/iotdb/IndexExample.java   | 176 ++++++
 .../main/java/org/apache/iotdb/JDBCExample.java    |   2 +-
 .../src/main/java/org/apache/iotdb/Solution.java   |   0
 .../org/apache/iotdb/db/qp/strategy/SqlBase.g4     |   4 +-
 .../IndexException.java}                           |  22 +-
 .../UnSupportedIndexTypeException.java}            |  19 +-
 .../db/exception/query/QueryProcessException.java  |   4 +
 .../db/exception/runtime/SQLParserException.java   |   9 +-
 .../org/apache/iotdb/db/index/FloatDigest.java     | 232 ++++++++
 .../org/apache/iotdb/db/index/IndexManager.java    |  33 ++
 .../java/org/apache/iotdb/db/index/LoadData.java   |  46 ++
 .../java/org/apache/iotdb/db/index/PisaIndex.java  | 618 +++++++++++++++++++++
 .../org/apache/iotdb/db/index/PisaIndexNode.java   | 109 ++++
 .../org/apache/iotdb/db/index/PisaIndexTree.java   |  63 +++
 .../java/org/apache/iotdb/db/index/QueryData.java  |  31 ++
 .../org/apache/iotdb/db/index/storage/Config.java  |  54 ++
 .../iotdb/db/index/storage/StorageFactory.java     |  43 ++
 .../storage/interfaces/IBackendModelCreator.java   |  30 +
 .../index/storage/interfaces/IBackendReader.java   |  29 +
 .../index/storage/interfaces/IBackendWriter.java   |  23 +
 .../db/index/storage/memory/FakeByteStore.java     | 140 +++++
 .../iotdb/db/index/storage/memory/FakeStore.java   | 131 +++++
 .../db/index/storage/model/FixWindowPackage.java   | 112 ++++
 .../model/serializer/DigestSerializeFactory.java   |  35 ++
 .../storage/model/serializer/DigestSerializer.java |  41 ++
 .../serializer/FixWindowPackageSerializer.java     |  77 +++
 .../model/serializer/FloatDigestSerializer.java    |  81 +++
 .../iotdb/db/index/utils/DataDigestUtil.java       | 125 +++++
 .../apache/iotdb/db/index/utils/DigestUtil.java    | 109 ++++
 .../iotdb/db/index/utils/ForestRootStack.java      |  47 ++
 .../org/apache/iotdb/db/index/utils/MyBytes.java   | 255 +++++++++
 .../org/apache/iotdb/db/metadata/MLogWriter.java   |  10 +
 .../org/apache/iotdb/db/metadata/MManager.java     | 164 ++++--
 .../iotdb/db/metadata/MetadataOperationType.java   |   2 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 140 ++++-
 .../iotdb/db/qp/logical/crud/IndexOperator.java    |  59 ++
 .../iotdb/db/qp/physical/crud/IndexPlan.java       |  61 ++
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |  34 ++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   6 +
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   7 +-
 .../iotdb/db/metadata/MManagerBasicTest.java       |   3 +-
 .../query/executor/GroupByEngineDataSetTest.java   |  24 +-
 .../reader/series/SeriesAggregateReaderTest.java   |   3 +-
 .../reader/series/SeriesReaderByTimestampTest.java |   3 +-
 .../db/query/reader/series/SeriesReaderTest.java   |   3 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |   5 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../file/metadata/statistics/Statistics.java       |  25 +
 .../iotdb/tsfile/read/filter/TimeFilter.java       |   2 +-
 .../tsfile/write/schema/MeasurementSchema.java     |  20 +-
 51 files changed, 3210 insertions(+), 123 deletions(-)

diff --git a/example/jdbc/src/main/java/org/apache/iotdb/IndexContractExample.java b/example/jdbc/src/main/java/org/apache/iotdb/IndexContractExample.java
new file mode 100644
index 0000000..e6f6411
--- /dev/null
+++ b/example/jdbc/src/main/java/org/apache/iotdb/IndexContractExample.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import static org.apache.iotdb.JDBCExample.outputResult;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class IndexContractExample {
+
+  public static void main(String[] args) throws ClassNotFoundException, SQLException, IOException {
+    Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+    try (Connection connection = DriverManager
+        .getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+//      insertData(statement);
+
+      long startTime = System.nanoTime();
+      ResultSet resultSet = statement.executeQuery("SELECT sum(*) FROM root WHERE time < 365000");
+      System.out.println((System.nanoTime() - startTime) / 1000000);
+      outputResult(resultSet);
+
+      statement.execute("CREATE INDEX on root.vehicle.d0.s0 USING pisa");
+      statement.execute("DROP INDEX pisa ON root.vehicle.d0.s0");
+    }
+  }
+
+  private static void insertData(Statement statement) throws SQLException {
+    // insert large amount of data time range : 0 ~ 730000
+    for (int time = 0; time < 730000; time++) {
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+      statement.execute(sql);
+      if (time % 667 == 1) {
+        statement.execute("FLUSH");
+      }
+    }
+  }
+}
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/IndexExample.java b/example/jdbc/src/main/java/org/apache/iotdb/IndexExample.java
new file mode 100644
index 0000000..373a9c1
--- /dev/null
+++ b/example/jdbc/src/main/java/org/apache/iotdb/IndexExample.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import static org.apache.iotdb.JDBCExample.outputResult;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class IndexExample {
+
+  public static String[] create_sql = new String[]{"SET STORAGE GROUP TO root.vehicle",
+      "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.vehicle.d0.s5 WITH DATATYPE=DOUBLE, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+  };
+
+  private static String[] stringValue = new String[]{"A", "B", "C", "D", "E"};
+  private static String[] booleanValue = new String[]{"true", "false"};
+
+  public static void main(String[] args) throws ClassNotFoundException, SQLException {
+    Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+    try (Connection connection = DriverManager
+        .getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+//      insertData(statement);
+
+      ResultSet resultSet = statement.executeQuery("SELECT * FROM root WHERE time < 10");
+      outputResult(resultSet);
+
+      statement.execute("CREATE INDEX on root.vehicle.d0.s0 USING pisa");
+      statement.execute("DROP INDEX pisa ON root.vehicle.d0.s0");
+    }
+  }
+
+  private static void insertData(Statement statement) throws SQLException {
+    // insert large amount of data time range : 3000 ~ 13600
+    for (int time = 3000; time < 13600; time++) {
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 100);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 17);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 22);
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time,
+          stringValue[time % 5]);
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", time,
+          booleanValue[time % 2]);
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s5) values(%s, %s)", time, time);
+      statement.execute(sql);
+    }
+
+    for (int time = 13700; time < 24000; time++) {
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 40);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 123);
+      statement.execute(sql);
+    }
+
+    statement.execute("merge");
+
+    // buffwrite data, unsealed file
+    for (int time = 100000; time < 101000; time++) {
+
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 20);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 30);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 77);
+      statement.execute(sql);
+    }
+
+    // sequential data, memory data
+    for (int time = 200000; time < 201000; time++) {
+
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, -time % 20);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, -time % 30);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, -time % 77);
+      statement.execute(sql);
+    }
+
+    statement.execute("FLUSH");
+    // unsequence insert, time < 3000
+    for (int time = 2000; time < 2500; time++) {
+
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time + 1);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time + 2);
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time,
+          stringValue[time % 5]);
+      statement.execute(sql);
+    }
+
+    for (int time = 100000; time < 100500; time++) {
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 666);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, 777);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, 888);
+      statement.execute(sql);
+    }
+
+    statement.execute("FLUSH");
+    // unsequence insert, time > 200000
+    for (int time = 200900; time < 201000; time++) {
+
+      String sql = String
+          .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 6666);
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, 7777);
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, 8888);
+      statement.execute(sql);
+      sql = String
+          .format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, "goodman");
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", time,
+          booleanValue[time % 2]);
+      statement.execute(sql);
+      sql = String.format("insert into root.vehicle.d0(timestamp,s5) values(%s, %s)", time, 9999);
+      statement.execute(sql);
+    }
+  }
+
+}
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
index 00f1084..8380b89 100644
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
@@ -58,7 +58,7 @@ public class JDBCExample {
     }
   }
 
-  private static void outputResult(ResultSet resultSet) throws SQLException {
+  static void outputResult(ResultSet resultSet) throws SQLException {
     if (resultSet != null) {
       System.out.println("--------------------------");
       final ResultSetMetaData metaData = resultSet.getMetaData();
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/Solution.java b/example/jdbc/src/main/java/org/apache/iotdb/Solution.java
new file mode 100644
index 0000000..e69de29
diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
index e6d6329..cd64892 100644
--- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
+++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
@@ -34,8 +34,8 @@ statement
     | DELETE STORAGE GROUP fullPath (COMMA fullPath)* #deleteStorageGroup
     | SHOW METADATA #showMetadata // not support yet
     | DESCRIBE prefixPath #describePath // not support yet
-    | CREATE INDEX ON fullPath USING function=ID indexWithClause? whereClause? #createIndex //not support yet
-    | DROP INDEX function=ID ON fullPath #dropIndex //not support yet
+    | CREATE INDEX ON fullPath USING function=ID #createIndex // indexWithClause? whereClause?
+    | DROP INDEX function=ID ON fullPath #dropIndex
     | MERGE #merge
     | FLUSH prefixPath? (COMMA prefixPath)* (booleanClause)?#flush
     | FULL MERGE #fullMerge
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/IndexException.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
copy to server/src/main/java/org/apache/iotdb/db/exception/index/IndexException.java
index f8c007c..10f4ae2 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/IndexException.java
@@ -16,25 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.exception.index;
 
-package org.apache.iotdb.db.exception.query;
-
-import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-public class QueryProcessException extends IoTDBException {
+public class IndexException extends QueryProcessException {
+
 
-  private static final long serialVersionUID = -683191083844850054L;
+  private static final long serialVersionUID = 2585920847533339136L;
 
-  public QueryProcessException(String message) {
-    super(message, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+  public IndexException(Throwable cause) {
+    super(cause, TSStatusCode.INDEX_ERROR.getStatusCode());
   }
 
-  public QueryProcessException(String message, int errorCode) {
-    super(message, errorCode);
+  public IndexException(String message) {
+    super(message, TSStatusCode.INDEX_ERROR.getStatusCode());
   }
 
-  public QueryProcessException(IoTDBException e) {
-    super(e, e.getErrorCode());
+  public IndexException(String message, int errorCode) {
+    super(message, errorCode);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/PathException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/UnSupportedIndexTypeException.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/exception/query/PathException.java
rename to server/src/main/java/org/apache/iotdb/db/exception/index/UnSupportedIndexTypeException.java
index 5fd792b..f96c087 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/PathException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/UnSupportedIndexTypeException.java
@@ -16,23 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.exception.query;
+package org.apache.iotdb.db.exception.index;
 
 import org.apache.iotdb.rpc.TSStatusCode;
 
-public class PathException extends QueryProcessException {
+public class UnSupportedIndexTypeException extends IndexException {
 
-  private static final long serialVersionUID = 2141197032898163234L;
+  private static final long serialVersionUID = 4967425512171623007L;
 
-  public PathException() {
-    super("Timeseries is null", TSStatusCode.PATH_ERROR.getStatusCode());
-  }
-
-  public PathException(String message) {
-    super(message, TSStatusCode.PATH_ERROR.getStatusCode());
-  }
-
-  public PathException(String message, int errorCode) {
-    super(message, errorCode);
+  public UnSupportedIndexTypeException(String indexType) {
+    super(String.format("Unsupported index type: [%s]", indexType),
+        TSStatusCode.UNSUPPORTED_INDEX_TYPE_ERROR.getStatusCode());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
index f8c007c..6784634 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
@@ -37,4 +37,8 @@ public class QueryProcessException extends IoTDBException {
   public QueryProcessException(IoTDBException e) {
     super(e, e.getErrorCode());
   }
+
+  public QueryProcessException(Throwable cause, int errorCode) {
+    super(cause, errorCode);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java b/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java
index f0d0d67..f0658d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java
@@ -18,15 +18,22 @@
  */
 package org.apache.iotdb.db.exception.runtime;
 
-public class SQLParserException extends RuntimeException{
+public class SQLParserException extends RuntimeException {
+
   private static final long serialVersionUID = 3249707655860110299L;
+
   public SQLParserException() {
     super("Error format in SQL statement, please check whether SQL statement is correct.");
   }
+
   public SQLParserException(String message) {
     super(message);
   }
 
+  public SQLParserException(Throwable cause) {
+    super(cause);
+  }
+
   public SQLParserException(String type, String message) {
     super(String.format("Unsupported type: [%s]. " + message, type));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/index/FloatDigest.java b/server/src/main/java/org/apache/iotdb/db/index/FloatDigest.java
new file mode 100644
index 0000000..70dbadc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/FloatDigest.java
@@ -0,0 +1,232 @@
+package org.apache.iotdb.db.index;
+
+import java.math.BigDecimal;
+
+/**
+ * Float's digest has five property: amx,min.average,square of average and starttime
+ *
+ * @author zhangjinrui
+ */
+public class FloatDigest {
+
+  private String key = "";
+  private long startTime = -1L;  // -code
+  private long timeWindow = -1L;
+  private long code = -1L;
+  private long serial = -1L;
+
+  public long parent = -1L;
+
+  private float max = 0;
+  private float min = 0;
+  private float avg = 0;
+  private long count = 0;
+  private BigDecimal squareSum = new BigDecimal(0.0);
+  private boolean isEmpty = false;
+
+  public void setEmpty(boolean empty) {
+    isEmpty = empty;
+  }
+
+  public boolean isEmpty() {
+    return isEmpty;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = super.hashCode();
+    result = prime * result + Float.floatToIntBits(avg);
+    result = prime * result + (int) (count ^ (count >>> 32));
+    result = prime * result + Float.floatToIntBits(max);
+    result = prime * result + Float.floatToIntBits(min);
+    result = prime * result + ((squareSum == null) ? 0 : squareSum.hashCode());
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "FloatDigest [key=" + key + ", startTime=" + startTime + ", timeWindow=" + timeWindow
+        + ", code=" + code + ", parent=" + parent
+        + ", serial=" + serial + ", max=" + max + ", min=" + min + ", avg=" + avg + ", count="
+        + count
+        + ", squareSum=" + squareSum + "]";
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!super.equals(obj)) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    FloatDigest other = (FloatDigest) obj;
+    if (Float.floatToIntBits(avg) != Float.floatToIntBits(other.avg)) {
+      return false;
+    }
+    if (count != other.count) {
+      return false;
+    }
+    if (Float.floatToIntBits(max) != Float.floatToIntBits(other.max)) {
+      return false;
+    }
+    if (Float.floatToIntBits(min) != Float.floatToIntBits(other.min)) {
+      return false;
+    }
+    if (squareSum == null) {
+      if (other.squareSum != null) {
+        return false;
+      }
+    } else if (!squareSum.equals(other.squareSum)) {
+      return false;
+    }
+    return true;
+  }
+
+  public float getMax() {
+    return max;
+  }
+
+  public void setMax(float max) {
+    this.max = max;
+  }
+
+  public float getMin() {
+    return min;
+  }
+
+  public void setMin(float min) {
+    this.min = min;
+  }
+
+  public float getAvg() {
+    return avg;
+  }
+
+  public void setAvg(float avg) {
+    this.avg = avg;
+  }
+
+  public long getCount() {
+    return count;
+  }
+
+  public void setCount(long count) {
+    this.count = count;
+  }
+
+  public BigDecimal getSquareSum() {
+    return squareSum;
+  }
+
+  public void setSquareSum(BigDecimal squareSum) {
+    this.squareSum = squareSum;
+  }
+
+  public FloatDigest() {
+    super();
+  }
+
+  public FloatDigest(boolean isEmpty) {
+    this.isEmpty = isEmpty;
+  }
+
+  public FloatDigest(String key, long startTime, long timeWindow, long code, long serial) {
+    this.key = key;
+    this.startTime = startTime;
+    this.timeWindow = timeWindow;
+    this.code = code;
+    this.serial = serial;
+  }
+
+  /**
+   * 经过merge产生的中间节点的  startTime 都是 -code
+   */
+  public FloatDigest(FloatDigest left, FloatDigest right) {
+    this.key = left.key;
+    this.code = right.code + 1;
+    if (left.isEmpty && right.isEmpty) {
+      isEmpty = true;
+      return;
+    }
+    this.startTime = -this.code;
+    this.timeWindow = left.timeWindow + right.timeWindow;
+
+    max = (left.max > right.max) ? left.max : right.max;
+    min = (left.min < right.min) ? left.min : right.min;
+    count = left.count + right.count;
+    avg = (left.avg / (left.count + right.count)) * left.count
+        + (right.avg / (left.count + right.count))
+        * right.count;
+    squareSum = left.squareSum.add(right.squareSum);
+
+    // mark for MPISA
+    left.parent = this.code;
+    right.parent = this.code;
+  }
+
+  public FloatDigest(String key, long startTime, long timeWindow, float max, float min,
+      long count, float avg, BigDecimal squareSum) {
+    this.key = key;
+    this.startTime = startTime;
+    this.timeWindow = timeWindow;
+    this.max = max;
+    this.min = min;
+    this.count = count;
+    this.avg = avg;
+    this.squareSum = squareSum;
+  }
+
+  public FloatDigest generateParent(FloatDigest a, FloatDigest b) {
+    return new FloatDigest(a, b);
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public void setKey(String key) {
+    this.key = key;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getTimeWindow() {
+    return timeWindow;
+  }
+
+  public long getEndTime() {
+    return startTime + timeWindow;
+  }
+
+  public void setTimeWindow(long timeWindow) {
+    this.timeWindow = timeWindow;
+  }
+
+  public long getCode() {
+    return code;
+  }
+
+  public void setCode(long code) {
+    this.code = code;
+  }
+
+  public long getSerial() {
+    return serial;
+  }
+
+  public void setSerial(long serial) {
+    this.serial = serial;
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
new file mode 100644
index 0000000..06d88d2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.db.index;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.db.exception.index.UnSupportedIndexTypeException;
+
+public class IndexManager {
+
+  private static Map<IndexType, PisaIndex> indexMap = new HashMap<>();
+
+
+  static {
+    indexMap.put(IndexType.PISAIndex, PisaIndex.getInstance());
+  }
+
+  public static PisaIndex getIndexInstance(IndexType indexType) {
+    return indexMap.get(indexType);
+  }
+
+  public enum IndexType {
+    PISAIndex;
+
+    public static IndexType getIndexType(String indexType) throws UnSupportedIndexTypeException {
+      String normalized = indexType.toLowerCase();
+      switch (normalized) {
+        case "pisa":
+          return PISAIndex;
+        default:
+          throw new UnSupportedIndexTypeException(indexType);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/LoadData.java b/server/src/main/java/org/apache/iotdb/db/index/LoadData.java
new file mode 100644
index 0000000..34216b9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/LoadData.java
@@ -0,0 +1,46 @@
+package org.apache.iotdb.db.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.iotdb.db.index.storage.Config;
+import org.apache.iotdb.db.index.storage.StorageFactory;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class LoadData {
+
+  public static void main(String[] args) throws Exception {
+    List<String> cfs = new ArrayList<>();
+    cfs.add(Config.digest_cf);
+    cfs.add(Config.data_cf);
+    IBackendModelCreator schemaCreator = StorageFactory.getBackendModelCreator();
+    schemaCreator.initialize("pisa", 1, cfs);
+
+    loadPisa(Config.timeSeriesName, (long) Math.pow(2, Integer.valueOf(Config.totals)));
+    System.exit(1);
+
+  }
+
+  /**
+   * @param key time series name
+   * @param total total points
+   */
+  private static void loadPisa(String key, long total) throws Exception {
+    PisaIndex<FloatDigest> pisaIndex = new PisaIndex<>();
+    //time window in ms
+    Random random = new Random();
+    long currentTime = 0;
+    for (long i = 0; i < total; i++) {
+      if (i % 100 == 0) {
+        System.out.println("current progress:" + i);
+      }
+      Pair<Long, Float> data = new Pair<>(currentTime++, random.nextFloat());
+      pisaIndex.insertPoint(data);
+
+    }
+
+    pisaIndex.close();
+    System.out.println("total data points:" + total);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/PisaIndex.java b/server/src/main/java/org/apache/iotdb/db/index/PisaIndex.java
new file mode 100644
index 0000000..29ca483
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/PisaIndex.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.storage.Config;
+import org.apache.iotdb.db.index.storage.StorageFactory;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.db.index.utils.DataDigestUtil;
+import org.apache.iotdb.db.index.utils.DigestUtil;
+import org.apache.iotdb.db.index.utils.ForestRootStack;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PisaIndex<T extends FloatDigest> {
+
+  private Logger logger = LoggerFactory.getLogger(PisaIndex.class);
+  private long maxSerialNo = 0;
+
+  private ForestRootStack<T> rootNodes;
+  protected IBackendReader reader = StorageFactory.getBackaBackendReader();
+  protected IBackendWriter writer = StorageFactory.getBackaBackendWriter();
+  private String rowkey;
+
+  private Pair<Long, Long> currentWindow = new Pair<>(0L, Config.timeWindow);
+  private FixWindowPackage pkg;
+
+
+  private FixWindowPackage leftPackage;
+  private FixWindowPackage rightPackage;
+
+  private static final String INDEX_FILE_PATH =
+      IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0] + File.separator
+          + "index" + File.separator + "test.tsfile";
+
+  public static PisaIndex getInstance() {
+    return PisaIndexHolder.INSTANCE;
+  }
+
+  public boolean build(Path path) throws IndexException {
+    // TODO initial
+    this.rowkey = path.toString();
+    pkg = new FixWindowPackage(rowkey, currentWindow);
+
+    T lastDigest = getLastDigest();
+
+    if (null == lastDigest) {
+      maxSerialNo = 0;
+      rootNodes = new ForestRootStack<T>();
+    } else {
+      maxSerialNo = lastDigest.getSerial();
+      rootNodes = getRoots(lastDigest);
+    }
+
+    // TODO new codes
+    long queryId = QueryResourceManager.getInstance().assignQueryId(true);
+    QueryContext context = new QueryContext(queryId);
+    Map<Long, Statistics> timePartitionMap = new HashMap<>();
+
+    try {
+      TSDataType dataType = SchemaUtils.getSeriesTypeByPath(path);
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(path, context, null);
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add(path.getMeasurement());
+      SeriesAggregateReader seriesReader = new SeriesAggregateReader(path, allSensors, dataType,
+          context,
+          queryDataSource, null, null, null);
+
+      while (seriesReader.hasNextChunk()) {
+        if (seriesReader.canUseCurrentChunkStatistics()) {
+          Statistics chunkStatistics = seriesReader.currentChunkStatistics();
+          updateStatistics(timePartitionMap, chunkStatistics);
+          seriesReader.skipCurrentChunk();
+          continue;
+        }
+        while (seriesReader.hasNextPage()) {
+          if (seriesReader.canUseCurrentPageStatistics()) {
+            Statistics pageStatistic = seriesReader.currentPageStatistics();
+            updateStatistics(timePartitionMap, pageStatistic);
+            seriesReader.skipCurrentPage();
+            continue;
+          }
+
+          while (seriesReader.hasNextPage()) {
+            BatchData nextOverlappedPageData = seriesReader.nextPage();
+            while (nextOverlappedPageData.hasCurrent()) {
+              updateStatisticsFromPage(timePartitionMap, nextOverlappedPageData, dataType);
+              nextOverlappedPageData.next();
+            }
+            nextOverlappedPageData.resetBatchData();
+          }
+        }
+      }
+      Set<PisaIndexNode> indexNodeSet = new TreeSet<>(
+          Comparator.comparingLong(PisaIndexNode::getNodeNumber));
+      for (Map.Entry<Long, Statistics> entry : timePartitionMap.entrySet()) {
+        PisaIndexNode node = new PisaIndexNode(entry.getKey(), entry.getValue());
+        indexNodeSet.add(node);
+        PisaIndexNode parentNode = mergeAndGenerateParent(indexNodeSet, node, dataType);
+        while (parentNode != null) {
+          indexNodeSet.add(parentNode);
+          parentNode = mergeAndGenerateParent(indexNodeSet, parentNode, dataType);
+        }
+      }
+
+      // TODO store PISA
+      File f = FSFactoryProducer.getFSFactory().getFile(INDEX_FILE_PATH);
+      if (f.exists()) {
+        f.delete();
+      }
+      TsFileWriter tsFileWriter = new TsFileWriter(f);
+      tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "min"),
+          new MeasurementSchema("max", dataType, TSEncoding.RLE));
+      tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "max"),
+          new MeasurementSchema("max", dataType, TSEncoding.RLE));
+      tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "first"),
+          new MeasurementSchema("first", dataType, TSEncoding.RLE));
+      tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "last"),
+          new MeasurementSchema("last", dataType, TSEncoding.RLE));
+      tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "sum"),
+          new MeasurementSchema("sum", TSDataType.DOUBLE, TSEncoding.RLE));
+
+      for (int i = 0; i < 10000; i++) {
+        for (PisaIndexNode node : indexNodeSet) {
+          TSRecord tsRecord = new TSRecord(node.getNodeNumber(), path.getFullPath() + i);
+          tsRecord.addTuple(DataPoint
+              .getDataPoint(dataType, "min", String.valueOf(node.getStatistics().getMinValue())));
+          tsRecord.addTuple(DataPoint
+              .getDataPoint(dataType, "max", String.valueOf(node.getStatistics().getMaxValue())));
+          tsRecord.addTuple(DataPoint
+              .getDataPoint(dataType, "first",
+                  String.valueOf(node.getStatistics().getFirstValue())));
+          tsRecord.addTuple(DataPoint
+              .getDataPoint(dataType, "last", String.valueOf(node.getStatistics().getLastValue())));
+          tsRecord.addTuple(DataPoint
+              .getDataPoint(TSDataType.DOUBLE, "sum",
+                  String.valueOf(node.getStatistics().getSumValue())));
+
+          // write TSRecord
+          tsFileWriter.write(tsRecord);
+        }
+      }
+      tsFileWriter.close();
+
+      // TODO for experiment
+      // create reader and get the readTsFile interface
+      Path sumPath = new Path(path + "0.sum");
+      TsFileSequenceReader reader = new TsFileSequenceReader(INDEX_FILE_PATH);
+      ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
+
+      long startTime = System.nanoTime();
+      Set<Long> queryNodes = getQueryNodes(1, 26);
+      GlobalTimeExpression expression = new GlobalTimeExpression(TimeFilter.in(queryNodes, false));
+
+      List<Path> paths = new ArrayList<>();
+      paths.add(sumPath);
+      QueryExpression queryExpression = QueryExpression.create(paths, expression);
+      QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+      double sum = 0.0;
+      while (queryDataSet.hasNext()) {
+        sum += queryDataSet.next().getFields().get(0).getDoubleV();
+      }
+
+      logger.info("queryNodes: {}, sum: {}, cost time: {}", queryNodes, sum,
+          (System.nanoTime() - startTime) / 100000);
+    } catch (MetadataException | StorageEngineException | IOException | WriteProcessException | QueryProcessException e) {
+      throw new IndexException(e);
+    }
+    return true;
+  }
+
+  private void updateStatistics(Map<Long, Statistics> timePartitionMap, Statistics statistics) {
+    long timePartitionId = fromTimeToTimePartition(statistics.getStartTime()) + 1;
+    if (timePartitionMap.containsKey(timePartitionId)) {
+      timePartitionMap.get(timePartitionId).mergeStatistics(statistics);
+    } else {
+      timePartitionMap.put(timePartitionId, statistics);
+    }
+  }
+
+  private void updateStatisticsFromPage(Map<Long, Statistics> timePartitionMap,
+      BatchData dataInThisPage, TSDataType dataType) {
+    long timePartitionId = fromTimeToTimePartition(dataInThisPage.currentTime());
+    Statistics statistics;
+    if (timePartitionMap.containsKey(timePartitionId)) {
+      statistics = timePartitionMap.get(timePartitionId);
+    } else {
+      statistics = Statistics.getStatsByType(dataType);
+    }
+    statistics.update(dataInThisPage.currentTime(), dataInThisPage.currentValue(), dataType);
+    timePartitionMap.put(timePartitionId, statistics);
+  }
+
+  private long fromTimeToTimePartition(long time) {
+    return time / IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+  }
+
+  private PisaIndexNode mergeAndGenerateParent(Set<PisaIndexNode> indexNodeSet,
+      PisaIndexNode node, TSDataType dataType) {
+    long nodeNumber = node.getNodeNumber();
+    int level = node.getLevel();
+    int bias = (int) Math.pow(2, level) - 1;
+    Statistics statistics = Statistics.getStatsByType(dataType);
+    long parentNodeNumber = 0;
+    for (PisaIndexNode indexNode : indexNodeSet) {
+      if (!indexNode.isInMem() || indexNode.getLevel() != level) {
+        continue;
+      }
+      if (indexNode.getNodeNumber() == nodeNumber - bias) {
+        statistics.mergeStatistics(indexNode.getStatistics());
+        parentNodeNumber = nodeNumber + 1;
+        indexNode.setInMem(false);
+        break;
+      } else if (indexNode.getNodeNumber() == nodeNumber + bias) {
+        statistics.mergeStatistics(indexNode.getStatistics());
+        parentNodeNumber = nodeNumber + bias + 1;
+        indexNode.setInMem(false);
+        break;
+      }
+    }
+    if (statistics.isEmpty()) {
+      return null;
+    }
+    node.setInMem(false);
+    statistics.mergeStatistics(node.getStatistics());
+    return new PisaIndexNode(parentNodeNumber, level + 1, statistics);
+  }
+
+  public boolean drop(Path path) {
+    return true;
+  }
+
+  private T getLastDigest() {
+    T lastDigest = null;
+    try {
+      System.out.println("rowkey:" + rowkey);
+      lastDigest = (T) reader.getLatestDigest(rowkey);
+    } catch (Exception e) {
+      e.printStackTrace();
+      logger.error(String.format("Read lastest digest error : (%s)", rowkey));
+    }
+    return lastDigest;
+  }
+
+
+  public void insertPoint(Pair<Long, Float> data) throws Exception {
+    if (pkg.cover(data.left)) {
+      pkg.add(data);
+    } else {
+      if (!pkg.isEmpty()) {
+        writer.write(rowkey, Config.data_cf, pkg.getStartTime(), pkg);
+      }
+
+      // insert middle packages
+      while (data.left >= currentWindow.right) {
+        FloatDigest digest = pkg.getDigest();
+        insert((T) digest);
+        currentWindow = new Pair<>(currentWindow.right, currentWindow.right + Config.timeWindow);
+        pkg = new FixWindowPackage(rowkey, currentWindow);
+      }
+      pkg.add(data);
+    }
+  }
+
+  public void close() throws Exception {
+    if (!pkg.isEmpty()) {
+      writer.write(rowkey, Config.data_cf, pkg.getStartTime(), pkg);
+      FloatDigest digest = pkg.getDigest();
+      insert((T) digest);
+    }
+  }
+
+  protected ForestRootStack<T> getRoots(T digest) {
+    ForestRootStack<T> rootStack = new ForestRootStack<T>();
+
+    List<Long> rootCodes = getRootCodes(maxSerialNo);
+    Long[] codeArray = rootCodes.toArray(new Long[]{});
+    if (codeArray.length != 0) {
+      T[] rootArray = (T[]) reader.getDigests(rowkey, codeArray);
+
+      for (int i = rootArray.length - 1; i >= 0; --i) {
+        rootStack.push(rootArray[i]);
+      }
+    }
+    if (digest.getSerial() % 2 == 1) {
+      rootStack.push(digest);
+    }
+
+    return rootStack;
+  }
+
+  //get all root codes in memory by a serial number
+  protected List<Long> getRootCodes(long serialNo) {
+    List<Long> rootCodes = new ArrayList<Long>();
+    long tmpUpperSerial = serialNo;
+
+    if (tmpUpperSerial % 2 == 1) {
+      tmpUpperSerial--;
+    }
+
+    while (tmpUpperSerial > 1) {
+      long rightestCode = DigestUtil.serialToCode(tmpUpperSerial);
+      long parentCode = DigestUtil.getRootCodeBySerialNum(tmpUpperSerial);
+
+      //why negative??????
+      rootCodes.add(-parentCode);
+
+      long leftestCode = DigestUtil.getLeftestCode(parentCode, rightestCode);
+      tmpUpperSerial = DigestUtil.codeToSerial(leftestCode) - 1;
+    }
+
+    return rootCodes;
+  }
+
+  public int insert(T digest) throws Exception {
+    ++maxSerialNo;
+    digest.setSerial(maxSerialNo);
+
+    long code = DigestUtil.serialToCode(maxSerialNo);
+    digest.setCode(code);
+
+    //digest nodes to be persistent
+    List<T> digests = new ArrayList<>();
+    rootNodes.push(digest);
+    //first add the leaf node of digest forest
+    digests.add(digest);
+
+    logger.debug("Insert Node code number : " + code);
+    if (maxSerialNo % 2 == 0) {
+      long parentCode = DigestUtil.getRootCodeBySerialNum(maxSerialNo);
+      //new generated internal nodes
+      List<T> parentNodes = generateParents(code + 1, parentCode);
+      digests.addAll(parentNodes);
+    }
+    for (T digestNode : digests) {
+      //flush all nodes in digests into disk
+      writer.write(rowkey, Config.digest_cf, digestNode.getStartTime(),
+          digestNode);
+    }
+    return digests.size();
+  }
+
+  private List<T> generateParents(long lowCodeNum, long upCodeNum) {
+    List<T> digests = new ArrayList<>();
+
+    for (long count = lowCodeNum; count <= upCodeNum; ++count) {
+      Pair<T, T> children = rootNodes.popPair();
+      T parentNode = generateParent(children.left, children.right);
+      logger.debug("Generate parent code number : " + parentNode.getCode());
+      rootNodes.push(parentNode);
+      digests.add(parentNode);
+    }
+
+    return digests;
+  }
+
+  @SuppressWarnings("unchecked")
+  private T generateParent(T a, T b) {
+    return (T) a.generateParent(a, b);
+  }
+
+  public void clean() {
+    this.leftPackage = null;
+    this.rightPackage = null;
+  }
+
+  /**
+   * return how many nodes we will read
+   */
+  public int queryPlan(Pair<Long, Long> range) {
+    leftPackage = reader.getBeforeOrEqualPackage(rowkey, range.left);
+    rightPackage = reader.getBeforeOrEqualPackage(rowkey, range.right);
+    // No packages.
+    // xxx xxx left xxx right xxx xxx
+    if (leftPackage == null && rightPackage == null) {
+      return 0;
+    }
+
+    // 1 packages.
+    // xxx xxx left ... right xxx xxx
+    else if (leftPackage != null && rightPackage != null
+        && leftPackage.getStartTime() == rightPackage.getStartTime()) {
+      return 1;
+    }
+    List<T> digests = new ArrayList<>();
+
+    // query left bound package.
+    long leftStartTime = leftPackage.getStartTime();
+    T leftDigest = (T) leftPackage
+        .getDigest(range.left, leftStartTime + leftPackage.getTimeWindow());
+    if (null != leftDigest) {
+      digests.add(leftDigest);
+    }
+
+    // query right bound package.
+    long rightStartTime = rightPackage.getStartTime();
+    T rightDigest = (T) rightPackage.getDigest(rightStartTime, range.right);
+    if (null != rightDigest) {
+      digests.add(rightDigest);
+    }
+
+    T leftAfterDigest = (T) reader.getAfterOrEqualDigest(rowkey, leftStartTime + 1);
+    T rightBeforeDigest = (T) reader.getBeforeOrEqualDigest(rowkey, rightStartTime - 1);
+
+    long downSerial = leftAfterDigest.getSerial();
+    long upSerial = rightBeforeDigest.getSerial();
+
+    if (downSerial > upSerial) {
+      return digests.size();
+    }
+
+    logger.debug("Query range(Serial Number) : " + downSerial + "-" + upSerial);
+    Pair<Long, Long> bounds = new Pair<>(downSerial, upSerial);
+    if (downSerial % 2 == 0) {
+      digests.add(leftAfterDigest);
+      bounds.left = bounds.left + 1;
+    }
+    if (upSerial % 2 == 1) {
+      digests.add(rightBeforeDigest);
+      bounds.right = bounds.right - 1;
+    }
+    if (bounds.left < bounds.right) {
+      T[] parentArray = queryBetween(bounds);
+      List<T> parentNodes = Arrays.asList(parentArray);
+      digests.addAll(parentNodes);
+    }
+    return digests.size();
+  }
+
+  public T query(Pair<Long, Long> range) {
+    if (leftPackage == null) {
+      leftPackage = reader.getBeforeOrEqualPackage(rowkey, range.left);
+    }
+    if (rightPackage == null) {
+      rightPackage = reader.getBeforeOrEqualPackage(rowkey, range.right);
+    }
+
+    // No packages.
+    // xxx xxx left xxx right xxx xxx
+    if (leftPackage == null && rightPackage == null) {
+      return null;
+    }
+
+    // 1 packages.
+    // xxx xxx left ... right xxx xxx
+    else if (leftPackage != null && rightPackage != null
+        && leftPackage.getStartTime() == rightPackage.getStartTime()) {
+      return (T) leftPackage.getDigest(range.left, range.right);
+    }
+
+    List<T> digests = new ArrayList<>();
+
+    // query left bound package.
+    long leftStartTime = leftPackage.getStartTime();
+    T leftDigest = (T) leftPackage
+        .getDigest(range.left, leftStartTime + leftPackage.getTimeWindow());
+    if (null != leftDigest) {
+      digests.add(leftDigest);
+    }
+
+    // query right bound package.
+    long rightStartTime = rightPackage.getStartTime();
+    T rightDigest = (T) rightPackage.getDigest(rightStartTime, range.right);
+    if (null != rightDigest) {
+      digests.add(rightDigest);
+    }
+
+    long time = System.currentTimeMillis();
+    T leftAfterDigest = (T) reader.getAfterOrEqualDigest(rowkey, leftStartTime + 1);
+    T rightBeforeDigest = (T) reader.getBeforeOrEqualDigest(rowkey, rightStartTime - 1);
+    System.out.println("this step time cost:" + (System.currentTimeMillis() - time));
+    long downSerial = leftAfterDigest.getSerial();
+    long upSerial = rightBeforeDigest.getSerial();
+
+    if (downSerial > upSerial) {
+      return (T) DataDigestUtil.aggregate(rowkey, range.left,
+          (digests.toArray(new FloatDigest[]{})));
+    }
+
+    System.out.println("Query range(Serial Number) : " + downSerial + "-" + upSerial);
+    Pair<Long, Long> bounds = new Pair<>(downSerial, upSerial);
+    if (downSerial % 2 == 0) {
+      digests.add(leftAfterDigest);
+      bounds.left = bounds.left + 1;
+    }
+    if (upSerial % 2 == 1) {
+      digests.add(rightBeforeDigest);
+      bounds.right = bounds.right - 1;
+    }
+    if (bounds.left < bounds.right) {
+      T[] parentArray = queryBetween(bounds);
+      List<T> parentNodes = Arrays.asList(parentArray);
+      digests.addAll(parentNodes);
+    }
+
+    T result = (T) DataDigestUtil.aggregate(rowkey, range.left,
+        (digests.toArray(new FloatDigest[]{})));
+    this.clean();
+    return result;
+  }
+
+  public T[] queryBetween(Pair<Long, Long> range) {
+    long lowerSerial = range.left; // leftLeafNumber
+    long upperSerial = range.right; // rightLeafNumber
+
+    long upperCode = DigestUtil.serialToCode(upperSerial); // rightNodeNumber
+    long parentCode = DigestUtil.getRootCodeBySerialNum(upperSerial); // rootNodeNumber
+    long leftCode = DigestUtil.getLeftestCode(parentCode, upperCode); // leftestNodeNumber
+    long leftSerial = DigestUtil.codeToSerial(leftCode); // leftestLeafNumber
+
+    List<Long> parentCodes = new ArrayList<>();
+    while (leftSerial != lowerSerial) {
+      if (leftSerial > lowerSerial) {
+        parentCodes.add(-parentCode);
+        upperSerial = leftSerial - 1;
+        upperCode = DigestUtil.serialToCode(upperSerial);
+        parentCode = DigestUtil.getRootCodeBySerialNum(upperSerial);
+      } else {
+        --parentCode;
+      }
+      leftCode = DigestUtil.getLeftestCode(parentCode, upperCode);
+      leftSerial = DigestUtil.codeToSerial(leftCode);
+    }
+
+    /*
+     * 将负的 code number 当做时间戳加到 candidate set 中
+     */
+    parentCodes.add(-parentCode);
+    Long[] codeArray = parentCodes.toArray(new Long[0]);
+
+    long start = System.currentTimeMillis();
+    System.out.println("query nodes size: " + codeArray.length);
+    T[] parentDigests = (T[]) reader.getDigests(rowkey, codeArray);
+    System.out.println(System.currentTimeMillis() - start);
+
+    return parentDigests;
+  }
+
+  private Set<Long> getQueryNodes(long leftLeafNumber, long rightLeafNumber) {
+    long leftestLeafNumber = PisaIndexNode.getLeftestLeafNumber(rightLeafNumber);
+    long rootNodeNumber = PisaIndexNode.getRootNodeNumber(rightLeafNumber);
+
+    Set<Long> parentNodes = new HashSet<>();
+    while (leftestLeafNumber >= leftLeafNumber) {
+      parentNodes.add(rootNodeNumber);
+      if (leftestLeafNumber == leftLeafNumber) {
+        break;
+      }
+      rightLeafNumber = leftestLeafNumber - 1;
+      rootNodeNumber = DigestUtil.getRootCodeBySerialNum(rightLeafNumber);
+      leftestLeafNumber = PisaIndexNode.getLeftestLeafNumber(rightLeafNumber);
+    }
+
+    return parentNodes;
+  }
+
+  private static class PisaIndexHolder {
+
+    static final PisaIndex INSTANCE = new PisaIndex();
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/PisaIndexNode.java b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexNode.java
new file mode 100644
index 0000000..4c8c0ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexNode.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+
+public class PisaIndexNode {
+
+  private long leafNumber;
+  private long nodeNumber;
+  private int level;
+  private boolean isInMem;
+  private Statistics statistics;
+
+  public PisaIndexNode(long leafNumber, Statistics statistics) {
+    this.leafNumber = leafNumber;
+    this.nodeNumber = getNodeNumberByLeafNumber(leafNumber);
+    this.level = 1;
+    this.statistics = statistics;
+    this.isInMem = true;
+  }
+
+  public PisaIndexNode(long nodeNumber, int level, Statistics statistics) {
+    this.nodeNumber = nodeNumber;
+    this.level = level;
+    this.statistics = statistics;
+    this.isInMem = true;
+  }
+
+  public long getLeafNumber() throws IndexException {
+    if (level != 0) {
+      throw new IndexException("Internal node doesn't have leaf number");
+    }
+    return leafNumber;
+  }
+
+  public long getNodeNumber() {
+    return nodeNumber;
+  }
+
+  public Statistics getStatistics() {
+    return statistics;
+  }
+
+  public int getLevel() {
+    return level;
+  }
+
+  public boolean isInMem() {
+    return isInMem;
+  }
+
+  public void setInMem(boolean inMem) {
+    isInMem = inMem;
+  }
+
+  public static long getNodeNumberByLeafNumber(long leafNumber) {
+    if (leafNumber % 2 == 1) {
+      return getRootNodeNumber(leafNumber);
+    } else {
+      return getRootNodeNumber(leafNumber - 1) + 1;
+    }
+  }
+
+  public static long getRootNodeNumber(long leafNumber) {
+    long count = 0;
+    long temp = leafNumber;
+    while (temp != 0) {
+      temp &= (temp - 1);
+      count++;
+    }
+    return (leafNumber << 1) - count;
+  }
+
+  public static long getLeafNumberByNodeNumber(long nodeNumber) {
+    long i = nodeNumber / 2;
+    while (i <= nodeNumber) {
+      if (getNodeNumberByLeafNumber(i) == nodeNumber) {
+        return i;
+      }
+      i++;
+    }
+    return -1L;
+  }
+
+  public static long getLeftestLeafNumber(long rightestLeafNumber) {
+    long root = getRootNodeNumber(rightestLeafNumber);
+    long depth = root - getNodeNumberByLeafNumber(rightestLeafNumber);
+    long nodesOfTree = 2 << depth;
+    return getLeafNumberByNodeNumber(root - nodesOfTree + 2L);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/PisaIndexTree.java b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexTree.java
new file mode 100644
index 0000000..b7f9015
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexTree.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class PisaIndexTree {
+
+  private Map<Long, Statistics> nodeNumberStatisticMap;
+
+  public PisaIndexTree(Map<Long, Statistics> nodeNumberStatisticMap) {
+    this.nodeNumberStatisticMap = nodeNumberStatisticMap;
+  }
+
+  public Map<Long, Statistics> getNodeNumberStatisticMap() {
+    return nodeNumberStatisticMap;
+  }
+
+  public int serialize(OutputStream outputStream) throws IOException, IndexException {
+    int byteLen = ReadWriteIOUtils.write(nodeNumberStatisticMap.size(), outputStream);
+    for (Map.Entry<Long, Statistics> entry : nodeNumberStatisticMap.entrySet()) {
+      long nodeNumber = entry.getKey();
+      byteLen += ReadWriteIOUtils.write(nodeNumber, outputStream);
+      byteLen += entry.getValue().serialize(outputStream);
+    }
+    return byteLen;
+  }
+
+  public static PisaIndexTree deserialize(InputStream inputStream, TSDataType dataType)
+      throws IOException {
+    int size = ReadWriteIOUtils.readInt(inputStream);
+    Map<Long, Statistics> nodeNumberStatisticMap = new HashMap<>();
+    for (int i = 0; i < size; i++) {
+      nodeNumberStatisticMap.put(ReadWriteIOUtils.readLong(inputStream),
+          Statistics.deserialize(inputStream, dataType));
+    }
+    return new PisaIndexTree(nodeNumberStatisticMap);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/QueryData.java b/server/src/main/java/org/apache/iotdb/db/index/QueryData.java
new file mode 100644
index 0000000..d06a336
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/QueryData.java
@@ -0,0 +1,31 @@
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+// query data in cassandra or mysql using pisa
+public class QueryData {
+
+  public static void main(String[] args) {
+    args = new String[]{"key", "0", "16384000000"};
+    if (args.length != 3) {
+      System.out.println("key starttime, endtime");
+      return;
+    }
+    String key = args[0];
+    long start = Long.valueOf(args[1]);
+    long end = Long.valueOf(args[2]);
+    PisaIndex<FloatDigest> pisaIndex = new PisaIndex<>();
+    System.out.println(String.format("will search %d nodes...",
+        pisaIndex.queryPlan(new Pair<Long, Long>(start, end))));
+
+    long time = System.currentTimeMillis();
+//		FloatDigest digest= pisaIndex.queryV2(new Pair<Long,Long>(start,end));
+    FloatDigest digest = pisaIndex.query(new Pair<Long, Long>(start, end));
+    time = System.currentTimeMillis() - time;
+
+    System.out.println("digest time cost:" + time);
+    System.out.println("count value:" + digest.getCount());
+    System.exit(1);
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/Config.java b/server/src/main/java/org/apache/iotdb/db/index/storage/Config.java
new file mode 100644
index 0000000..298b8c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/Config.java
@@ -0,0 +1,54 @@
+package org.apache.iotdb.db.index.storage;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+/**
+ * A class that contains configuration properties for the cassandra node it runs within.
+ *
+ * Properties declared as volatile can be mutated via JMX.
+ */
+public class Config {
+
+  public static String storage_engine = "local";
+  public static String digest_cf = "digest";
+  public static String data_cf = "data";
+  public static long timeWindow = 100L;
+
+  //time series name
+  public static String timeSeriesName = "key";
+
+  //total number of points: Math.pow(2,total)
+  public static String totals = "10";
+
+  public static boolean writePackages = true;
+
+
+  public static void load(String path) {
+    if (path == null) {
+      return;
+    }
+
+    Properties properties = new Properties();
+
+    try {
+      FileInputStream fileInputStream = new FileInputStream(path);
+      properties.load(fileInputStream);
+      properties.putAll(System.getenv());
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    storage_engine = properties.getOrDefault("storage_engine", storage_engine).toString();
+    digest_cf = properties.getOrDefault("digest_cf", digest_cf).toString();
+    data_cf = properties.getOrDefault("data_cf", data_cf).toString();
+    timeSeriesName = properties.getOrDefault("timeSeriesName", timeSeriesName).toString();
+
+    timeWindow = Long
+        .parseLong(properties.getOrDefault("timeWindow", String.valueOf(timeWindow)).toString());
+
+    writePackages = Boolean
+        .parseBoolean(properties.getOrDefault("writePackages", writePackages).toString());
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/StorageFactory.java b/server/src/main/java/org/apache/iotdb/db/index/storage/StorageFactory.java
new file mode 100644
index 0000000..9f4327f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/StorageFactory.java
@@ -0,0 +1,43 @@
+package org.apache.iotdb.db.index.storage;
+
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.memory.FakeByteStore;
+import org.apache.iotdb.db.index.storage.memory.FakeStore;
+
+public class StorageFactory {
+
+  private static FakeStore fakeStore = new FakeStore();
+  private static FakeByteStore fakeByteStore = new FakeByteStore();
+
+  public static IBackendReader getBackaBackendReader() {
+    String storageClass = Config.storage_engine;
+    switch (storageClass) {
+      case "local":
+        return fakeStore;
+      default:
+        return fakeByteStore;
+    }
+  }
+
+  public static IBackendWriter getBackaBackendWriter() {
+    String storageClass = Config.storage_engine;
+    switch (storageClass) {
+      case "local":
+        return fakeStore;
+      default:
+        return fakeByteStore;
+    }
+  }
+
+  public static IBackendModelCreator getBackendModelCreator() {
+    String storageClass = Config.storage_engine;
+    switch (storageClass) {
+      case "local":
+        return fakeStore;
+      default:
+        return fakeByteStore;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendModelCreator.java b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendModelCreator.java
new file mode 100644
index 0000000..d5c7e1f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendModelCreator.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.db.index.storage.interfaces;
+
+import java.util.List;
+
+
+public interface IBackendModelCreator {
+
+  /**
+   * initialize the schema. create the keyspace and a group of columnfamilies.
+   */
+  public void initialize(String ks, int replicaFactor, List<String> columnfamilies)
+      throws Exception;
+
+  /**
+   * add a column Family into StorageSystem.
+   *
+   * @param ks database name
+   * @param cf column family name
+   */
+  public void addColumnFamily(String ks, String cf) throws Exception;
+
+  public void addFloatColumnFamily(String ks, String cf) throws Exception;
+
+  /**
+   * add column families in to StorageSystem. <br> this method is recommended when you want to
+   * create many cfs in a short time (When the storageSystem is Cassandra, because Cassandra has
+   * some performance problem if u create cf one by one).
+   */
+  public void addColumnFamilies(String ks, List<String> cfs) throws Exception;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendReader.java b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendReader.java
new file mode 100644
index 0000000..775c264
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendReader.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.db.index.storage.interfaces;
+
+
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+
+public interface IBackendReader {
+
+  byte[] getBytes(String key, String cf, long startTime);
+
+  /**
+   * giving some DataPackage's startTime,return these digests from Cassandra in one command
+   */
+  FloatDigest[] getDigests(String key, Long[] timeStamps);
+
+  FloatDigest getBeforeOrEqualDigest(String key, long timestamp);
+
+  FixWindowPackage getBeforeOrEqualPackage(String key, long timestamp);
+
+  FloatDigest getAfterOrEqualDigest(String key, long timestamp);
+
+  /**
+   * get latest one data digest after the timestamp if the timestamp belongs to a datapackage (not
+   * the starttime of a datapackage), the method will return the datapackage. SO user NEED to decide
+   * whether allign the timestamp
+   */
+  FloatDigest getLatestDigest(String key) throws Exception;
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendWriter.java b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendWriter.java
new file mode 100644
index 0000000..e2c729a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendWriter.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.db.index.storage.interfaces;
+
+
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+
+public interface IBackendWriter {
+
+  /**
+   * @param startTimestamp an align time.
+   */
+  void write(String key, String cf, long startTimestamp, FixWindowPackage dp)
+      throws Exception;
+
+  /**
+   * @param startTimestamp an align time.
+   */
+  void write(String key, String cf, long startTimestamp, FloatDigest digest)
+      throws Exception;
+
+  void write(String key, String cf, long startTimestamp, byte[] digest)
+      throws Exception;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeByteStore.java b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeByteStore.java
new file mode 100644
index 0000000..458bfb8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeByteStore.java
@@ -0,0 +1,140 @@
+package org.apache.iotdb.db.index.storage.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.db.index.storage.model.serializer.FixWindowPackageSerializer;
+import org.apache.iotdb.db.index.storage.model.serializer.FloatDigestSerializer;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+/**
+ * this fake store has only one cf and one
+ */
+
+public class FakeByteStore implements IBackendModelCreator, IBackendWriter, IBackendReader {
+
+  private Map<String, TreeMap<Long, byte[]>> data = new HashMap<>();
+  private Map<String, TreeMap<Long, byte[]>> digests = new HashMap<>();
+
+  private FixWindowPackageSerializer dataSerializer = FixWindowPackageSerializer.getInstance();
+  private FloatDigestSerializer digestSerializer = FloatDigestSerializer.getInstance();
+
+  @Override
+  public void initialize(String ks, int replicaFactor, List<String> columnFamilies)
+      throws Exception {
+  }
+
+  @Override
+  public void addColumnFamily(String ks, String cf) throws Exception {
+  }
+
+  @Override
+  public void addColumnFamilies(String ks, List<String> cfs) throws Exception {
+  }
+
+  @Override
+  public FloatDigest[] getDigests(String key, Long[] timeStamps) {
+    List<FloatDigest> list = new ArrayList<>();
+    Map<Long, byte[]> map = digests.get(key);
+    if (map == null) {
+      return null;
+    }
+    byte[] pkg;
+    for (long time : timeStamps) {
+      if ((pkg = map.get(time)) != null) {
+        list.add(digestSerializer.deserialize(key, time, pkg));
+      }
+    }
+    return list.toArray(new FloatDigest[]{});
+  }
+
+  @Override
+  public FloatDigest getBeforeOrEqualDigest(String key, long timestamp) {
+    TreeMap<Long, byte[]> map = digests.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    Map.Entry<Long, byte[]> entry = map.floorEntry(timestamp);
+    if (entry == null) {
+      return null;
+    }
+    return digestSerializer.deserialize(key, entry.getKey(), entry.getValue());
+  }
+
+  @Override
+  public FixWindowPackage getBeforeOrEqualPackage(String key, long timestamp) {
+    TreeMap<Long, byte[]> map = data.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    Map.Entry<Long, byte[]> entry = map.floorEntry(timestamp);
+    if (entry == null) {
+      return null;
+    }
+    return dataSerializer.deserialize(key, entry.getKey(), entry.getValue());
+  }
+
+  @Override
+  public FloatDigest getAfterOrEqualDigest(String key, long timestamp) {
+    TreeMap<Long, byte[]> map = digests.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    Map.Entry<Long, byte[]> entry = map.ceilingEntry(timestamp);
+    if (entry == null) {
+      return null;
+    }
+    return digestSerializer.deserialize(key, entry.getKey(), entry.getValue());
+  }
+
+  @Override
+  public FloatDigest getLatestDigest(String key) throws Exception {
+    List<Pair<Long, FloatDigest>> list = new ArrayList<>();
+    TreeMap<Long, byte[]> map = digests.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    return digestSerializer.deserialize(key, map.lastKey(), map.lastEntry().getValue());
+  }
+
+  @Override
+  public void write(String key, String cf, long startTimestamp, FixWindowPackage dp)
+      throws Exception {
+    TreeMap<Long, byte[]> map = data.computeIfAbsent(key, k -> new TreeMap<>());
+    map.put(startTimestamp, dataSerializer.serialize(dp));
+  }
+
+  @Override
+  public void write(String key, String cf, long startTimestamp, FloatDigest digest)
+      throws Exception {
+    TreeMap<Long, byte[]> map = this.digests.computeIfAbsent(key, k -> new TreeMap<>());
+    map.put(startTimestamp, digestSerializer.serialize(digest));
+  }
+
+  @Override
+  public void write(String key, String cf, long startTimestamp, byte[] digest) throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addFloatColumnFamily(String ks, String cf) throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public byte[] getBytes(String key, String cf, long startTime) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeStore.java b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeStore.java
new file mode 100644
index 0000000..0c23dc2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeStore.java
@@ -0,0 +1,131 @@
+package org.apache.iotdb.db.index.storage.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+/**
+ * this fake store has only one cf and one
+ */
+public class FakeStore implements IBackendModelCreator, IBackendWriter, IBackendReader {
+
+  private Map<String, TreeMap<Long, FixWindowPackage>> data = new HashMap<String, TreeMap<Long, FixWindowPackage>>();
+  private Map<String, TreeMap<Long, FloatDigest>> digests = new HashMap<String, TreeMap<Long, FloatDigest>>();
+
+  @Override
+  public void initialize(String ks, int replicaFactor, List<String> columnfamilies)
+      throws Exception {
+  }
+
+  @Override
+  public void addColumnFamily(String ks, String cf) throws Exception {
+  }
+
+  @Override
+  public void addColumnFamilies(String ks, List<String> cfs) throws Exception {
+  }
+
+  @Override
+  public FloatDigest[] getDigests(String key, Long[] timeStamps) {
+    List<FloatDigest> list = new ArrayList<>();
+    Map<Long, FloatDigest> map = digests.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    FloatDigest pkg = null;
+    for (long time : timeStamps) {
+      if ((pkg = map.get(time)) != null) {
+        list.add(pkg);
+      }
+    }
+    return list.toArray(new FloatDigest[]{});
+  }
+
+  @Override
+  public FloatDigest getBeforeOrEqualDigest(String key, long timestamp) {
+    TreeMap<Long, FloatDigest> map = digests.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    if (map.floorEntry(timestamp) == null) {
+      return null;
+    }
+    return map.floorEntry(timestamp).getValue();
+  }
+
+  @Override
+  public FixWindowPackage getBeforeOrEqualPackage(String key, long timestamp) {
+    TreeMap<Long, FixWindowPackage> map = data.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    if (map.floorEntry(timestamp) == null) {
+      return null;
+    }
+    return map.floorEntry(timestamp).getValue();
+  }
+
+  @Override
+  public FloatDigest getAfterOrEqualDigest(String key, long timestamp) {
+    TreeMap<Long, FloatDigest> map = digests.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    if (map.ceilingKey(timestamp) == null) {
+      return null;
+    }
+    return map.ceilingEntry(timestamp).getValue();
+  }
+
+  @Override
+  public FloatDigest getLatestDigest(String key) throws Exception {
+    List<Pair<Long, FloatDigest>> list = new ArrayList<>();
+    TreeMap<Long, FloatDigest> map = digests.get(key);
+    if (map == null || map.size() == 0) {
+      return null;
+    }
+    return map.lastEntry().getValue();
+  }
+
+  @Override
+  public void write(String key, String cf, long startTimestamp, FixWindowPackage dp)
+      throws Exception {
+    TreeMap<Long, FixWindowPackage> map = data.computeIfAbsent(key, k -> new TreeMap<>());
+    map.put(startTimestamp, dp);
+  }
+
+  @Override
+  public void write(String key, String cf, long startTimestamp, FloatDigest digest)
+      throws Exception {
+    TreeMap<Long, FloatDigest> map = this.digests.computeIfAbsent(key, k -> new TreeMap<>());
+    map.put(startTimestamp, digest);
+  }
+
+  @Override
+  public void write(String key, String cf, long startTimestamp, byte[] digest) throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addFloatColumnFamily(String ks, String cf) throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public byte[] getBytes(String key, String cf, long startTime) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/FixWindowPackage.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/FixWindowPackage.java
new file mode 100644
index 0000000..fc679c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/FixWindowPackage.java
@@ -0,0 +1,112 @@
+package org.apache.iotdb.db.index.storage.model;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.utils.DataDigestUtil;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class FixWindowPackage {
+
+  protected String key = null;
+  private Pair<Long, Long> timeWindow;
+  private TreeMap<Long, Object> treeMap;
+
+  public FixWindowPackage(String key, Pair<Long, Long> timeWindow) {
+    this.key = key;
+    this.timeWindow = timeWindow;
+    treeMap = new TreeMap<>();
+  }
+
+  public void add(Pair<Long, Float> data) {
+    treeMap.put(data.left, data.right);
+  }
+
+  public void add(long time, float value) {
+    treeMap.put(time, value);
+  }
+
+  public boolean isEmpty() {
+    return treeMap.isEmpty();
+  }
+
+  public boolean cover(Pair<Long, Float> data) {
+    return data.left >= timeWindow.left && data.left < timeWindow.right;
+  }
+
+  public boolean cover(long time) {
+    return time >= timeWindow.left && time < timeWindow.right;
+  }
+
+  public long getStartTime() {
+    return timeWindow.left;
+  }
+
+  public long getEndTime() {
+    return timeWindow.right;
+  }
+
+  public int size() {
+    return treeMap.size();
+  }
+
+  public long getTimeWindow() {
+    return timeWindow.right - timeWindow.left;
+  }
+
+  public TreeMap<Long, Object> getData() {
+    return treeMap;
+  }
+
+  public FloatDigest getDigest(long startTime, long endTime) {
+    Pair<Long, Long> regularRange = FixWindowPackage
+        .rangeRegular(startTime, endTime, timeWindow.left, timeWindow.right);
+    long actStartTime = regularRange.left;
+    long actEndTime = regularRange.right;
+
+    SortedMap<Long, Object> dataPoints = treeMap.subMap(actStartTime, actEndTime + 1);
+    if (dataPoints.size() < 1) {
+      return new FloatDigest(true);
+    }
+
+    return DataDigestUtil.getDigest(key, timeWindow.left, getTimeWindow(), dataPoints);
+  }
+
+  public static Pair<Long, Long> rangeRegular(long queryStartTime, long queryEndTime,
+      long startTime, long endTime) {
+    long actStartTime = queryStartTime;
+    long actEndTime = queryEndTime;
+
+    if (queryStartTime > endTime) {
+      actStartTime = endTime;
+      actEndTime = endTime;
+    } else if (queryEndTime < startTime) {
+      actStartTime = startTime;
+      actEndTime = startTime;
+    } else {
+      if (queryStartTime == -1 || queryStartTime < startTime) {
+        actStartTime = startTime;
+      }
+      if (queryEndTime == -1 || queryEndTime > endTime) {
+        actEndTime = endTime;
+      }
+
+      if (actEndTime < actStartTime) {
+        actEndTime = actStartTime;
+      }
+    }
+
+    return new Pair<>(actStartTime, actEndTime);
+  }
+
+
+  public FloatDigest getDigest() {
+    return getDigest(timeWindow.left, timeWindow.right);
+  }
+
+  @Override
+  public String toString() {
+    return treeMap.toString();
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializeFactory.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializeFactory.java
new file mode 100644
index 0000000..67d4f7e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializeFactory.java
@@ -0,0 +1,35 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import org.apache.iotdb.db.index.FloatDigest;
+
+public class DigestSerializeFactory {
+
+  private final String FLOAT = "FloatDigest";
+
+  private final String BTREENODE = "BtreeNode";
+
+  private static DigestSerializeFactory instance = new DigestSerializeFactory();
+
+  private FloatDigestSerializer floatSerializer = FloatDigestSerializer.getInstance();
+
+  private DigestSerializeFactory() {
+  }
+
+  public static DigestSerializeFactory getInstance() {
+    return instance;
+  }
+
+  public byte[] serialize(FloatDigest dataDigest) {
+    String className = dataDigest.getClass().getSimpleName();
+    switch (className) {
+      case FLOAT:
+        return floatSerializer.serialize((FloatDigest) dataDigest);
+      default:
+        return null;
+    }
+  }
+
+  public FloatDigest deserializeFloatDigest(String key, long timestamp, byte[] bytes) {
+    return floatSerializer.deserialize(key, timestamp, bytes);
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializer.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializer.java
new file mode 100644
index 0000000..1cb8f8a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializer.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import java.util.ArrayList;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.utils.MyBytes;
+
+
+public class DigestSerializer {
+
+  public byte[] serialize(FloatDigest FloatDigest) {
+    ArrayList<byte[]> byteList = new ArrayList<>();
+
+    byte[] aBytes = MyBytes.longToBytes(FloatDigest.getTimeWindow());
+    byteList.add(aBytes);
+
+    aBytes = MyBytes.longToBytes(FloatDigest.getCode());
+    byteList.add(aBytes);
+
+    aBytes = MyBytes.longToBytes(FloatDigest.getSerial());
+    byteList.add(aBytes);
+
+    return MyBytes.concatByteArrayList(byteList);
+  }
+
+  public FloatDigest deserialize(String key, long startTime, byte[] bytes) {
+
+    int position = 1;
+    byte[] aBytes = MyBytes.subBytes(bytes, position, 8);
+    long timeWindow = MyBytes.bytesToLong(aBytes);
+
+    position += 8;
+    aBytes = MyBytes.subBytes(bytes, position, 8);
+    long code = MyBytes.bytesToLong(aBytes);
+
+    position += 8;
+    aBytes = MyBytes.subBytes(bytes, position, 8);
+    long serial = MyBytes.bytesToLong(aBytes);
+
+    return new FloatDigest(key, startTime, timeWindow, code, serial);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FixWindowPackageSerializer.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FixWindowPackageSerializer.java
new file mode 100644
index 0000000..e66a121
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FixWindowPackageSerializer.java
@@ -0,0 +1,77 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.db.index.utils.MyBytes;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class FixWindowPackageSerializer {
+
+  protected static FixWindowPackageSerializer instance = new FixWindowPackageSerializer();
+
+  private FixWindowPackageSerializer() {
+  }
+
+  public static FixWindowPackageSerializer getInstance() {
+    return instance;
+  }
+
+  public byte[] serialize(FixWindowPackage dataPackage) {
+
+    ArrayList<byte[]> byteList = new ArrayList<>();
+    byte[] aBytes;
+
+    //8byte: time window
+    aBytes = MyBytes.longToBytes(dataPackage.getTimeWindow());
+    byteList.add(aBytes);
+    //4byte: size
+    aBytes = MyBytes.intToBytes(dataPackage.size());
+    byteList.add(aBytes);
+
+    //size * (8byte long + 4byte float): data
+    TreeMap<Long, Object> data = dataPackage.getData();
+    for (Map.Entry<Long, Object> entry : data.entrySet()) {
+      aBytes = MyBytes.longToBytes(entry.getKey());
+      byteList.add(aBytes);
+      aBytes = MyBytes.floatToBytes((float) entry.getValue());
+      byteList.add(aBytes);
+    }
+
+    return MyBytes.concatByteArrayList(byteList);
+  }
+
+  public FixWindowPackage deserialize(String key, long startTime, byte[] bytes) {
+
+    int position = 0;
+
+    //8byte: time window
+    byte[] aBytes = MyBytes.subBytes(bytes, position, 8);
+    long timeWindow = MyBytes.bytesToLong(aBytes);
+    position += 8;
+
+    //4byte: size,
+    aBytes = MyBytes.subBytes(bytes, position, 4);
+    int size = MyBytes.bytesToInt(aBytes);
+    position += 4;
+
+    //size * (8byte long + 4byte float): data
+    FixWindowPackage pkg = new FixWindowPackage(key,
+        new Pair<Long, Long>(startTime, startTime + timeWindow));
+    long time;
+    float value;
+    for (int i = 0; i < size; ++i) {
+      aBytes = MyBytes.subBytes(bytes, position, 8);
+      time = MyBytes.bytesToLong(aBytes);
+      position += 8;
+      aBytes = MyBytes.subBytes(bytes, position, 4);
+      value = MyBytes.bytesToFloat(aBytes);
+      position += 4;
+      pkg.add(time, value);
+    }
+
+    return pkg;
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FloatDigestSerializer.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FloatDigestSerializer.java
new file mode 100644
index 0000000..61a395c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FloatDigestSerializer.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.utils.MyBytes;
+
+
+public class FloatDigestSerializer extends DigestSerializer
+{
+	protected static FloatDigestSerializer instance = new FloatDigestSerializer();
+
+	private FloatDigestSerializer() {
+
+	}
+
+	public static FloatDigestSerializer getInstance() {
+		return instance;
+	}
+
+	public byte[] serialize(FloatDigest floatDigest) {
+		ArrayList<byte[]> byteList = new ArrayList<>();
+
+		byte[] aBytes = new byte[1];
+
+		aBytes[0] = 2;
+		byteList.add(aBytes);
+
+		aBytes = super.serialize(floatDigest);
+		byteList.add(aBytes);
+
+		aBytes = MyBytes.floatToBytes(floatDigest.getMax());
+		byteList.add(aBytes);
+
+		aBytes = MyBytes.floatToBytes(floatDigest.getMin());
+		byteList.add(aBytes);
+
+		aBytes = MyBytes.longToBytes(floatDigest.getCount());
+		byteList.add(aBytes);
+
+		aBytes = MyBytes.floatToBytes(floatDigest.getAvg());
+		byteList.add(aBytes);
+
+		aBytes = MyBytes.StringToBytes(floatDigest.getSquareSum().toString());
+		byteList.add(aBytes);
+
+		return MyBytes.concatByteArrayList(byteList);
+	}
+
+	public FloatDigest deserialize(String key, long startTime, byte[] bytes) {
+
+		FloatDigest dataDigest = super.deserialize(key, startTime, bytes);
+
+		int position = 1 + 8 + 8 + 8;
+		byte[] aBytes = MyBytes.subBytes(bytes, position, 4);
+		float max = MyBytes.bytesToFloat(aBytes);
+
+		position += 4;
+		aBytes = MyBytes.subBytes(bytes, position, 4);
+		float min = MyBytes.bytesToFloat(aBytes);
+
+		position += 4;
+		aBytes = MyBytes.subBytes(bytes, position, 8);
+		long count = MyBytes.bytesToLong(aBytes);
+
+		position += 8;
+		aBytes = MyBytes.subBytes(bytes, position, 4);
+		float avg = MyBytes.bytesToFloat(aBytes);
+
+		position += 4;
+		aBytes = MyBytes.subBytes(bytes, position, bytes.length-position);
+		BigDecimal squareSum = new BigDecimal(MyBytes.bytesToString(aBytes));
+
+		dataDigest.setMax(max);
+		dataDigest.setMin(min);
+		dataDigest.setCount(count);
+		dataDigest.setAvg(avg);
+		dataDigest.setSquareSum(squareSum);
+		return dataDigest;
+	}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/DataDigestUtil.java b/server/src/main/java/org/apache/iotdb/db/index/utils/DataDigestUtil.java
new file mode 100644
index 0000000..209d1fb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/DataDigestUtil.java
@@ -0,0 +1,125 @@
+package org.apache.iotdb.db.index.utils;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.Vector;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.iotdb.db.index.FloatDigest;
+
+public class DataDigestUtil {
+
+  public static FloatDigest aggregate(String key, long startTime, FloatDigest[] dataDigests) {
+    if (dataDigests.length == 0) {
+      return null;
+    }
+    return floatAggregate(key, startTime, dataDigests);
+  }
+
+  public static FloatDigest getDigest(String key, long startTime, long timeWindow,
+      Collection<Object> datas) {
+    float[] floatDatas = ArrayUtils.toPrimitive(datas.toArray(new Float[]{}));
+    return floatDigest(key, startTime, timeWindow, floatDatas);
+  }
+
+  public static FloatDigest getDigest(String key, long startTime, long timeWindow,
+      SortedMap<Long, Object> dataPoints) {
+    return DataDigestUtil.partFloatDigest(key, startTime, timeWindow,
+        dataPoints);
+  }
+
+
+  public static FloatDigest floatDigest(String key, long startTime, long timeWindow,
+      float[] data) {
+    float max = Float.MIN_VALUE;
+    float min = Float.MAX_VALUE;
+    long count = 0;
+    double sum = 0;
+    Vector<Float> container = new Vector<>();
+    for (float aData : data) {
+      if (aData != Float.MAX_VALUE) {
+        if (max < aData) {
+          max = aData;
+        }
+        if (min > aData) {
+          min = aData;
+        }
+        ++count;
+        sum += aData;
+        container.add(aData);
+      }
+    }
+    float avg = (float) (sum / count);
+
+    BigDecimal squareSum = new BigDecimal(0);
+    for (Float float1 : container) {
+      Double squaredif = Math.pow(float1 - avg, 2.0);
+      squareSum = squareSum.add(new BigDecimal(squaredif));
+    }
+    return new FloatDigest(key, startTime, timeWindow, max, min, count, avg,
+        squareSum);
+  }
+
+  public static FloatDigest partFloatDigest(String key, long startTime, long timeWindow,
+      SortedMap<Long, Object> dataPoints) {
+    float max = Float.MIN_VALUE;
+    float min = Float.MAX_VALUE;
+    long count = 0;
+    double sum = 0;
+    BigDecimal squareSum = new BigDecimal(0);
+    Vector<Float> container = new Vector<>();
+    for (Entry<Long, Object> dataPoint : dataPoints.entrySet()) {
+      float value = (float) dataPoint.getValue();
+      if (value != Float.MAX_VALUE) {
+        if (max < value) {
+          max = value;
+        }
+        if (min > value) {
+          min = value;
+        }
+        count++;
+        sum += value;
+        container.add(value);
+      }
+    }
+    float avg = (float) (sum / count);
+
+    for (Float aFloat : container) {
+      double squareDif = Math.pow(aFloat - avg, 2.0);
+      squareSum = squareSum.add(new BigDecimal(squareDif));
+    }
+    return new FloatDigest(key, startTime, timeWindow, max, min, count, avg,
+        squareSum);
+  }
+
+  public static FloatDigest floatAggregate(String key, long startTime,
+      FloatDigest[] dataDigests) {
+    long timeWindow = 0L;
+
+    float max = Float.MIN_VALUE;
+    float min = Float.MAX_VALUE;
+    long count = 0;
+    double sum = 0;
+    BigDecimal squareSum = new BigDecimal(0);
+    for (FloatDigest dataDigest : dataDigests) {
+      FloatDigest floatDigest = (FloatDigest) dataDigest;
+      timeWindow = timeWindow + dataDigest.getTimeWindow();
+
+      if (max < floatDigest.getMax()) {
+        max = floatDigest.getMax();
+      }
+      if (min > floatDigest.getMin()) {
+        min = floatDigest.getMin();
+      }
+      sum += (floatDigest.getAvg() * floatDigest.getCount());
+      count += floatDigest.getCount();
+      squareSum.add(floatDigest.getSquareSum());
+    }
+    float avg = (float) (sum / count);
+
+    return new FloatDigest(key, startTime, timeWindow, max, min, count, avg,
+        squareSum);
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/DigestUtil.java b/server/src/main/java/org/apache/iotdb/db/index/utils/DigestUtil.java
new file mode 100644
index 0000000..f29c477
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/DigestUtil.java
@@ -0,0 +1,109 @@
+package org.apache.iotdb.db.index.utils;
+
+
+import com.clearspring.analytics.util.Pair;
+import org.apache.iotdb.db.index.storage.Config;
+
+public class DigestUtil {
+
+  // if left point, serialNo is the result of getRootCodeBySerialNum().
+  // else, serialNo is the result of getRootCodeBySerialNum() of its left bother - 1.
+  public static long serialToCode(long serialNumber) {
+    if (serialNumber % 2 == 1) {
+      return getRootCodeBySerialNum(serialNumber);
+    } else {
+      return getRootCodeBySerialNum(serialNumber - 1) + 1;
+    }
+  }
+
+  /**
+   * get the right leaf serial number of the code number
+   */
+  public static long codeToSerial(long code) {
+    long begin = code / 2;
+    for (long i = begin; ; ++i) {
+      if (getRootCodeBySerialNum(i) == code) {
+        return i;
+      }
+    }
+  }
+
+  public static long timeToCode(long time) {
+    long serial = time / Config.timeWindow + 1;
+    return DigestUtil.serialToCode(serial);
+  }
+
+
+  public static long regularTime(long time) {
+    return time - time % Config.timeWindow;
+  }
+
+  public static long timeToSerial(long time) {
+    return time / Config.timeWindow + 1;
+  }
+
+  public static void main(String[] args) {
+    System.out.println(getRootCodeBySerialNum(13));
+  }
+
+  // get root code number by serial number
+  public static long getRootCodeBySerialNum(long serialNumber) {
+    long count = 0;
+    long serialNum = serialNumber;
+    for (; serialNum != 0; ++count) {
+      serialNum &= (serialNum -  1);
+    }
+    return (serialNumber << 1) - count;
+  }
+
+  /**
+   * depth=depth of tree - 1
+   */
+  public static long getDistanceBetweenLeftestAndRoot(long depth) {
+    //2^(depth+1)
+    long nodesOfTree = 2 << depth;
+    long distance = nodesOfTree - 2L;
+    return distance;
+  }
+
+  public static long getLeftestCode(long root, long rightestCode) {
+    long depth = root - rightestCode;
+    long leftestCode = root - getDistanceBetweenLeftestAndRoot(depth);
+    return leftestCode;
+  }
+
+  private static long[] table2 = new long[129];
+
+  static {
+    for (int i = 0; i <= 128; i++) {
+      table2[i] = (long) Math.pow(2, i) - 1;
+    }
+  }
+
+  public static Pair<Long, Long> getChildrenCode(long parentCode) {
+    int i = 0;
+    long root = parentCode;
+    int skew = 0;
+    while (true) {
+      i = 0;
+      while (i <= 128) {
+        if (root == 1) {
+          return new Pair<Long, Long>(skew + 1L, skew + 1L);
+        } else if (root == 2) {
+          return new Pair<Long, Long>(skew + 2L, skew + 2L);
+        }
+        if (root == table2[i]) {
+          return new Pair<Long, Long>(root / 2 + skew, parentCode - 1);
+        }
+        if (root < table2[i]) {
+          i--;
+          break;
+        } else {
+          i++;
+        }
+      }
+      root -= table2[i];
+      skew += table2[i];
+    }
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/ForestRootStack.java b/server/src/main/java/org/apache/iotdb/db/index/utils/ForestRootStack.java
new file mode 100644
index 0000000..a87ac49
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/ForestRootStack.java
@@ -0,0 +1,47 @@
+package org.apache.iotdb.db.index.utils;
+
+import java.util.Stack;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class ForestRootStack<T> {
+
+  private Stack<T> rootNodes = null;
+
+  public ForestRootStack() {
+    this.rootNodes = new Stack<T>();
+  }
+
+  public void push(T aRootNode) {
+    if (rootNodes == null) {
+      rootNodes = new Stack<T>();
+    }
+
+    rootNodes.push(aRootNode);
+  }
+
+  public Pair<T, T> popPair() {
+    if (rootNodes == null) {
+      return null;
+    }
+
+    T rightChild = rootNodes.pop();
+    T leftChild = rootNodes.pop();
+    return new Pair<>(leftChild, rightChild);
+  }
+
+  public String toString() {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (T t : rootNodes) {
+      stringBuilder.append(t).append(",");
+    }
+    return stringBuilder.toString();
+  }
+
+  public int size() {
+    return this.rootNodes.size();
+  }
+
+  public T get(int index) {
+    return this.rootNodes.get(index);
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/MyBytes.java b/server/src/main/java/org/apache/iotdb/db/index/utils/MyBytes.java
new file mode 100644
index 0000000..b2233bf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/MyBytes.java
@@ -0,0 +1,255 @@
+package org.apache.iotdb.db.index.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+
+//TODO Kangrong comment by English
+public class MyBytes {
+
+  /**
+   * byte[] convert to int
+   */
+  public static byte[] intToBytes(int i) {
+    byte[] result = new byte[4];
+    result[0] = (byte) ((i >> 24) & 0xFF);
+    result[1] = (byte) ((i >> 16) & 0xFF);
+    result[2] = (byte) ((i >> 8) & 0xFF);
+    result[3] = (byte) (i & 0xFF);
+    return result;
+  }
+
+  /**
+   * byte[] convert to int
+   */
+  public static int bytesToInt(byte[] bytes) {
+    int value = 0;
+    // high bit to low
+    for (int i = 0; i < 4; i++) {
+      int shift = (4 - 1 - i) * 8;
+      value += (bytes[i] & 0x000000FF) << shift;
+    }
+    return value;
+  }
+
+  /**
+   * float杞崲byte
+   */
+  public static byte[] floatToBytes(float x) {
+    byte[] b = new byte[4];
+    int l = Float.floatToIntBits(x);
+    for (int i = 0; i < 4; i++) {
+      b[i] = new Integer(l).byteValue();
+      l = l >> 8;
+    }
+    return b;
+  }
+
+  public static int floatSize() {
+    return 4;
+  }
+
+  /**
+   * fill byte array b by x from offset position.
+   *
+   * @param b result
+   */
+  public static void floatToBytes(float x, byte[] b, int offset) {
+    assert b.length - offset >= 4;
+    int l = Float.floatToIntBits(x);
+    for (int i = offset; i < 4 + offset; i++) {
+      b[i] = new Integer(l).byteValue();
+      l = l >> 8;
+    }
+  }
+
+
+  /**
+   * 閫氳繃byte鏁扮粍鍙栧緱float
+   */
+  public static float bytesToFloat(byte[] b) {
+    int l;
+    l = b[0];
+    l &= 0xff;
+    l |= ((long) b[1] << 8);
+    l &= 0xffff;
+    l |= ((long) b[2] << 16);
+    l &= 0xffffff;
+    l |= ((long) b[3] << 24);
+    return Float.intBitsToFloat(l);
+  }
+
+  /**
+   * double杞崲byte
+   *
+   * @return byte[]
+   */
+  public static byte[] doubleToBytes(double data) {
+    byte[] bytes = new byte[8];
+    long value = Double.doubleToLongBits(data);
+    for (int i = 0; i < bytes.length; i++) {
+      bytes[i] = new Long(value).byteValue();
+      value = value >> 8;
+    }
+    return bytes;
+  }
+
+  public static int doubleSize() {
+    return 8;
+  }
+
+  /**
+   * 閫氳繃byte鏁扮粍鍙栧緱double
+   */
+  public static double bytesToDouble(byte[] bytes) {
+    long value = bytes[0];
+    value &= 0xff;
+    value |= ((long) bytes[1] << 8);
+    value &= 0xffff;
+    value |= ((long) bytes[2] << 16);
+    value &= 0xffffff;
+    value |= ((long) bytes[3] << 24);
+    value &= 0xffffffffL;
+    value |= ((long) bytes[4] << 32);
+    value &= 0xffffffffffL;
+    value |= ((long) bytes[5] << 40);
+    value &= 0xffffffffffffL;
+    value |= ((long) bytes[6] << 48);
+    value &= 0xffffffffffffffL;
+    value |= ((long) bytes[7] << 56);
+
+    return Double.longBitsToDouble(value);
+  }
+
+  /**
+   * float杞崲byte
+   */
+  public static byte[] boolToBytes(boolean x) {
+    byte[] b = new byte[1];
+    if (x) {
+      b[0] = 1;
+    } else {
+      b[0] = 0;
+    }
+    return b;
+  }
+
+  public static int boolSize() {
+    return 1;
+  }
+
+  /**
+   * float杞崲byte
+   */
+  public static byte[] boolToBytes(boolean x, byte[] b, int offset) {
+    if (x) {
+      b[offset] = 1;
+    } else {
+      b[offset] = 0;
+    }
+    return b;
+  }
+
+  /**
+   * 閫氳繃byte鏁扮粍鍙栧緱float
+   */
+  public static boolean bytesToBool(byte[] b) {
+    if (b.length != 1) {
+      return false;
+    }
+    if (b[0] == 0) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  public static byte[] longToBytes(long num) {
+    byte[] byteNum = new byte[8];
+    for (int ix = 0; ix < 8; ++ix) {
+      int offset = 64 - (ix + 1) * 8;
+      byteNum[ix] = (byte) ((num >> offset) & 0xff);
+    }
+    return byteNum;
+  }
+
+  public static int longSize() {
+    return 8;
+  }
+
+  public static byte[] longToBytes(long num, byte[] byteNum, int offset_) {
+    for (int ix = 0; ix < 8; ++ix) {
+      int offset = 64 - (ix + 1) * 8;
+      byteNum[ix + offset_] = (byte) ((num >> offset) & 0xff);
+    }
+    return byteNum;
+  }
+
+  public static long bytesToLong(byte[] byteNum) {
+    long num = 0;
+    for (int ix = 0; ix < 8; ++ix) {
+      num <<= 8;
+      num |= (byteNum[ix] & 0xff);
+    }
+    return num;
+  }
+
+  public static byte[] StringToBytes(String str) {
+    try {
+      return str.getBytes("UTF8");
+    } catch (UnsupportedEncodingException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  public static String bytesToString(byte[] byteStr) {
+    try {
+      return new String(byteStr, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  // concat two byteArray
+  public static byte[] concatByteArray(byte[] a, byte[] b) {
+    byte[] c = new byte[a.length + b.length];
+    System.arraycopy(a, 0, c, 0, a.length);
+    System.arraycopy(b, 0, c, a.length, b.length);
+    return c;
+  }
+
+  // concat two byteArray
+  public static byte[] concatByteArrayList(List<byte[]> list) {
+    int size = list.size();
+    int len = 0;
+    for (byte[] cs : list) {
+      len += cs.length;
+    }
+    byte[] result = new byte[len];
+    int pos = 0;
+    for (int i = 0; i < size; i++) {
+      int l = list.get(i).length;
+      System.arraycopy(list.get(i), 0, result, pos, l);
+      pos += l;
+    }
+    return result;
+  }
+
+  public static byte[] subBytes(byte[] src, int start, int length) {
+    if ((start + length) > src.length) {
+      return null;
+    }
+    if (length <= 0) {
+      return null;
+    }
+    byte[] result = new byte[length];
+    for (int i = 0; i < length; i++) {
+      result[i] = src[start + i];
+    }
+    return result;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
index 6b41a44..c0ad8a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
@@ -118,6 +118,16 @@ public class MLogWriter {
     newLine();
   }
 
+  public void addIndex(String path, String indexType) throws IOException {
+    writer.write(String.format("%s,%s,%s", MetadataOperationType.ADD_INDEX, path, indexType));
+    newLine();
+  }
+
+  public void dropIndex(String path, String indexType) throws IOException {
+    writer.write(String.format("%s,%s,%s", MetadataOperationType.DROP_INDEX, path, indexType));
+    newLine();
+  }
+
   public static void upgradeMLog(String schemaDir, String logFileName) throws IOException {
     File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
     File tmpLogFile = SystemFileFactory.INSTANCE.getFile(logFile.getAbsolutePath() + ".tmp");
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index e39d0ac..272b3d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,31 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import static java.util.stream.Collectors.toList;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,7 +51,13 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -55,20 +86,6 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.stream.Collectors.toList;
-
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -1140,7 +1157,7 @@ public class MManager {
    * @return deviceId
    */
   public String getDeviceId(String path) {
-    MNode deviceNode = null;
+    MNode deviceNode;
     try {
       deviceNode = getDeviceNode(path);
       path = deviceNode.getFullPath();
@@ -1667,6 +1684,48 @@ public class MManager {
   }
 
   /**
+   * Check whether the timeseries has index
+   */
+  public boolean checkIndex(String path, IndexType indexType) throws MetadataException {
+    lock.readLock().lock();
+    try {
+      return mtree.getSchema(path).isIndexed(indexType.toString());
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Create index for timeseries
+   */
+  public void addIndex(String path, IndexType indexType) throws MetadataException, IOException {
+    lock.writeLock().lock();
+    try {
+      mtree.getSchema(path).setIndex(indexType.toString(), true);
+      if (!isRecovering) {
+        logWriter.addIndex(path, indexType.toString());
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Drop index for one timeseries
+   */
+  public void dropIndex(String path, IndexType indexType) throws MetadataException, IOException {
+    lock.writeLock().lock();
+    try {
+      mtree.getSchema(path).setIndex(indexType.toString(), false);
+      if (!isRecovering) {
+        logWriter.dropIndex(path, indexType.toString());
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
    * Check whether the given path contains a storage group
    */
   boolean checkStorageGroupByPath(String path) {
@@ -1828,15 +1887,10 @@ public class MManager {
   }
 
   /**
-   * get schema for device.
-   * Attention!!!  Only support insertPlan and insertTabletsPlan
-   * @param deviceId
-   * @param measurementList
-   * @param plan
-   * @return
-   * @throws MetadataException
+   * get schema for device. Attention!!!  Only support insertPlan and insertTabletsPlan
    */
-  public MeasurementSchema[] getSeriesSchemas(String deviceId, String[] measurementList, PhysicalPlan plan) throws MetadataException {
+  public MeasurementSchema[] getSeriesSchemas(String deviceId, String[] measurementList,
+      PhysicalPlan plan) throws MetadataException {
     MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
 
     MNode deviceNode = null;
@@ -1851,7 +1905,8 @@ public class MManager {
           // could not create it
           if (!config.isAutoCreateSchemaEnabled()) {
             throw new MetadataException(String.format(
-              "Current deviceId[%s] does not contain measurement:%s", deviceId, measurementList[i]));
+                "Current deviceId[%s] does not contain measurement:%s", deviceId,
+                measurementList[i]));
           }
 
           // create it
@@ -1859,19 +1914,20 @@ public class MManager {
           TSDataType dataType = getTypeInLoc(plan, i);
 
           createTimeseries(
-            path.getFullPath(),
-            dataType,
-            getDefaultEncoding(dataType),
-            TSFileDescriptor.getInstance().getConfig().getCompressor(),
-            Collections.emptyMap());
+              path.getFullPath(),
+              dataType,
+              getDefaultEncoding(dataType),
+              TSFileDescriptor.getInstance().getConfig().getCompressor(),
+              Collections.emptyMap());
         }
 
-        MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode, measurementList[i]);
+        MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode,
+            measurementList[i]);
 
         // check type is match
         TSDataType insertDataType = null;
         if (plan instanceof InsertPlan) {
-          if (!((InsertPlan)plan).isNeedInferType()) {
+          if (!((InsertPlan) plan).isNeedInferType()) {
             // only when InsertPlan's values is object[], we should check type
             insertDataType = getTypeInLoc(plan, i);
           } else {
@@ -1883,11 +1939,11 @@ public class MManager {
 
         if (measurementNode.getSchema().getType() != insertDataType) {
           logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
-            measurementList[i], insertDataType, measurementNode.getSchema().getType());
+              measurementList[i], insertDataType, measurementNode.getSchema().getType());
           if (!config.isEnablePartialInsert()) {
             throw new MetadataException(String.format(
-              "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
-              measurementList[i], insertDataType, measurementNode.getSchema().getType()));
+                "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
+                measurementList[i], insertDataType, measurementNode.getSchema().getType()));
           } else {
             // mark failed measurement
             if (plan instanceof InsertTabletPlan) {
@@ -1910,7 +1966,7 @@ public class MManager {
         }
       } catch (MetadataException e) {
         logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i],
-          e.getMessage());
+            e.getMessage());
         if (config.isEnablePartialInsert()) {
           // mark failed measurement
           if (plan instanceof InsertPlan) {
@@ -1926,13 +1982,14 @@ public class MManager {
     return schemas;
   }
 
-  private void changeStringValueToRealType(InsertPlan plan, int loc, TSDataType type) throws MetadataException {
+  private void changeStringValueToRealType(InsertPlan plan, int loc, TSDataType type)
+      throws MetadataException {
     plan.getTypes()[loc] = type;
     try {
       switch (type) {
         case INT32:
           plan.getValues()[loc] =
-            Integer.parseInt(String.valueOf(plan.getValues()[loc]));
+              Integer.parseInt(String.valueOf(plan.getValues()[loc]));
           break;
         case INT64:
           plan.getValues()[loc] =
@@ -1956,7 +2013,8 @@ public class MManager {
           break;
       }
     } catch (ClassCastException e) {
-      logger.error("inconsistent type between client and server for " + e.getMessage() + " " + type);
+      logger
+          .error("inconsistent type between client and server for " + e.getMessage() + " " + type);
       throw new MetadataException(e.getMessage());
     } catch (NumberFormatException e) {
       logger.error("inconsistent type between type {} and value {}", type, plan.getValues()[loc]);
@@ -1984,49 +2042,43 @@ public class MManager {
         return conf.getDefaultTextEncoding();
       default:
         throw new UnSupportedDataTypeException(
-          String.format("Data type %s is not supported.", dataType.toString()));
+            String.format("Data type %s is not supported.", dataType.toString()));
     }
   }
 
   /**
-   * get dataType of plan, in loc measurements
-   * only support InsertPlan and InsertTabletPlan
-   * @param plan
-   * @param loc
-   * @return
-   * @throws MetadataException
+   * get dataType of plan, in loc measurements only support InsertPlan and InsertTabletPlan
    */
   private TSDataType getTypeInLoc(PhysicalPlan plan, int loc) throws MetadataException {
     TSDataType dataType;
     if (plan instanceof InsertPlan) {
       InsertPlan tPlan = (InsertPlan) plan;
-      dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+      dataType = TypeInferenceUtils
+          .getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
     } else if (plan instanceof InsertTabletPlan) {
       dataType = ((InsertTabletPlan) plan).getDataTypes()[loc];
-    }  else {
+    } else {
       throw new MetadataException(String.format(
-        "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+          "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
     }
     return dataType;
   }
 
   /**
-   * when insert, we lock device node for not create deleted time series
-   * before insert, we should call this function to lock the device node
-   * @param deviceId
+   * when insert, we lock device node for not create deleted time series before insert, we should
+   * call this function to lock the device node
    */
   public void lockInsert(String deviceId) throws MetadataException {
     getDeviceNodeWithAutoCreateAndReadLock(deviceId);
   }
 
   /**
-   * when insert, we lock device node for not create deleted time series
-   * after insert, we should call this function to unlock the device node
-   * @param deviceId
+   * when insert, we lock device node for not create deleted time series after insert, we should
+   * call this function to unlock the device node
    */
   public void unlockInsert(String deviceId) {
     try {
-      MNode mNode =getDeviceNode(deviceId);
+      MNode mNode = getDeviceNode(deviceId);
       mNode.readUnlock();
     } catch (MetadataException e) {
       // ignore the exception
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
index 3700972..98899c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
@@ -27,6 +27,8 @@ public class MetadataOperationType {
   public static final String CREATE_TIMESERIES = "0";
   public static final String DELETE_TIMESERIES = "1";
   public static final String SET_STORAGE_GROUP = "2";
+  public static final String ADD_INDEX = "3";
+  public static final String DROP_INDEX = "4";
   public static final String SET_TTL = "10";
   public static final String DELETE_STORAGE_GROUP = "11";
   public static final String CHANGE_OFFSET = "12";
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index adb0b04..e78644a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -18,6 +18,43 @@
  */
 package org.apache.iotdb.db.qp.executor;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -37,10 +74,16 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.db.exception.index.UnSupportedIndexTypeException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
+import org.apache.iotdb.db.index.PisaIndex;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -49,8 +92,39 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.IndexPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -78,14 +152,6 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 public class PlanExecutor implements IPlanExecutor {
 
   private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
@@ -201,6 +267,9 @@ public class PlanExecutor implements IPlanExecutor {
       case CREATE_SCHEMA_SNAPSHOT:
         operateCreateSnapshot();
         return true;
+      case INDEX:
+        operateIndex((IndexPlan) plan);
+        return true;
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", plan.getOperatorType()));
@@ -834,6 +903,48 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private void operateIndex(IndexPlan indexPlan) throws QueryProcessException {
+    try {
+      String path = indexPlan.getPaths().get(0).getFullPath();
+      IndexType indexType = indexPlan.getIndexType();
+      if (!mManager.isPathExist(path)) {
+        throw new PathNotExistException(path);
+      }
+      PisaIndex pisaIndex = IndexManager.getIndexInstance(indexType);
+      if (pisaIndex == null) {
+        throw new UnSupportedIndexTypeException(indexType.toString());
+      }
+      switch (indexPlan.getIndexOperatorType()) {
+        case CREATE_INDEX:
+          if (mManager.checkIndex(path, indexType)) {
+            throw new IndexException(
+                String.format("Timeseries %s has already been indexed.", path));
+          }
+
+          if (pisaIndex.build(new Path(path))) {
+            mManager.addIndex(path, indexType);
+          }
+          break;
+        case DROP_INDEX:
+          if (!mManager.checkIndex(path, indexType)) {
+            throw new IndexException(
+                String.format("Timeseries %s hasn't been indexed.", path));
+          }
+          if (pisaIndex.drop(new Path(path))) {
+            mManager.dropIndex(path, indexType);
+          }
+          break;
+        default:
+          throw new IndexException(String
+              .format("Not supported index operation %s", indexPlan.getIndexOperatorType()));
+      }
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    } catch (IOException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+  }
+
   @Override
   public void update(Path path, long startTime, long endTime, String value) {
     throw new UnsupportedOperationException("update is not supported now");
@@ -856,14 +967,15 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
-    throws MetadataException {
-    return mManager.getSeriesSchemas(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
+      throws MetadataException {
+    return mManager
+        .getSeriesSchemas(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
   }
 
   protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan insertTabletPlan)
-    throws MetadataException {
+      throws MetadataException {
     return mManager.getSeriesSchemas(insertTabletPlan.getDeviceId(),
-      insertTabletPlan.getMeasurements(), insertTabletPlan);
+        insertTabletPlan.getMeasurements(), insertTabletPlan);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/IndexOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/IndexOperator.java
new file mode 100644
index 0000000..0a6e5ca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/IndexOperator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.logical.crud;
+
+import org.apache.iotdb.db.index.IndexManager.IndexType;
+import org.apache.iotdb.db.qp.logical.RootOperator;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public final class IndexOperator extends RootOperator {
+
+  private Path path;
+  private final IndexOperatorType indexOperatorType;
+  private final IndexType indexType;
+
+  public IndexOperator(int tokenIntType, Path path, IndexOperatorType indexOperatorType, IndexType indexType) {
+    super(tokenIntType);
+    this.path = path;
+    this.indexOperatorType = indexOperatorType;
+    this.indexType = indexType;
+    operatorType = OperatorType.INDEX;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public IndexOperatorType getIndexOperatorType() {
+    return indexOperatorType;
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
+  }
+
+  public void setPath(Path path) {
+    this.path = path;
+  }
+
+  public enum IndexOperatorType {
+    CREATE_INDEX, DROP_INDEX
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/IndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/IndexPlan.java
new file mode 100644
index 0000000..0ce944d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/IndexPlan.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.crud;
+
+
+import static org.apache.iotdb.db.qp.logical.Operator.OperatorType.INDEX;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator.IndexOperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class IndexPlan extends PhysicalPlan {
+
+  private Path path;
+  private final IndexType indexType;
+  private final IndexOperatorType indexOperatorType;
+
+  public IndexPlan(Path path, IndexOperatorType indexOperatorType, IndexType indexType) {
+    super(false, INDEX);
+    this.path = path;
+    this.indexOperatorType = indexOperatorType;
+    this.indexType = indexType;
+  }
+
+  @Override
+  public List<Path> getPaths() {
+    List<Path> list = new ArrayList<>();
+    if (path != null) {
+      list.add(path);
+    }
+    return list;
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
+  }
+
+  public IndexOperatorType getIndexOperatorType() {
+    return indexOperatorType;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 6b14f26..0600be0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -29,7 +29,9 @@ import java.util.Map;
 import java.util.Set;
 import org.antlr.v4.runtime.tree.TerminalNode;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.index.UnSupportedIndexTypeException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.RootOperator;
@@ -38,6 +40,8 @@ import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 import org.apache.iotdb.db.qp.logical.crud.FromOperator;
 import org.apache.iotdb.db.qp.logical.crud.InOperator;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator.IndexOperatorType;
 import org.apache.iotdb.db.qp.logical.crud.InsertOperator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectOperator;
@@ -79,6 +83,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AttributeClausesContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ConstantContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountNodesContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountTimeseriesContext;
+import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateIndexContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateRoleContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateSnapshotContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateTimeseriesContext;
@@ -87,6 +92,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DateExpressionContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteStatementContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteStorageGroupContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteTimeseriesContext;
+import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropIndexContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropRoleContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropUserContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.FillClauseContext;
@@ -733,6 +739,34 @@ public class LogicalGenerator extends SqlBaseBaseListener {
     initializedOperator = new ShowTTLOperator(storageGroups);
   }
 
+  @Override
+  public void enterCreateIndex(CreateIndexContext ctx) {
+    super.enterCreateIndex(ctx);
+    IndexType indexType;
+    try {
+      indexType = IndexType.getIndexType(ctx.ID().getText());
+    } catch (UnSupportedIndexTypeException e) {
+      throw new SQLParserException(e);
+    }
+    initializedOperator = new IndexOperator(SQLConstant.TOK_CREATE_INDEX,
+        parseFullPath(ctx.fullPath()), IndexOperatorType.CREATE_INDEX, indexType);
+    operatorType = SQLConstant.TOK_CREATE_INDEX;
+  }
+
+  @Override
+  public void enterDropIndex(DropIndexContext ctx) {
+    super.enterDropIndex(ctx);
+    IndexType indexType;
+    try {
+      indexType = IndexType.getIndexType(ctx.ID().getText());
+    } catch (UnSupportedIndexTypeException e) {
+      throw new SQLParserException(e);
+    }
+    initializedOperator = new IndexOperator(SQLConstant.TOK_DROP_INDEX,
+        parseFullPath(ctx.fullPath()), IndexOperatorType.DROP_INDEX, indexType);
+    operatorType = SQLConstant.TOK_DROP_INDEX;
+  }
+
   private String[] parsePrivilege(PrivilegesContext ctx) {
     List<TerminalNode> privilegeList = ctx.STRING_LITERAL();
     List<String> privileges = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index a72fcb0..2eebfd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
 import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator;
 import org.apache.iotdb.db.qp.logical.crud.InsertOperator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator;
@@ -69,6 +70,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.IndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -206,6 +208,10 @@ public class PhysicalGenerator {
                 String.format(
                     "not supported operator type %s in ttl operation.", operator.getType()));
         }
+      case INDEX:
+        IndexOperator indexOperator = (IndexOperator) operator;
+        return new IndexPlan(indexOperator.getPath(),
+            indexOperator.getIndexOperatorType(), indexOperator.getIndexType());
       case LOAD_CONFIGURATION:
         LoadConfigurationOperatorType type = ((LoadConfigurationOperator) operator)
             .getLoadConfigurationOperatorType();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index fd359e1..d503aaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -57,18 +57,21 @@ public class SchemaUtils {
       logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getMeasurementId(),
           e);
     }
-
   }
 
   public static List<TSDataType> getSeriesTypesByPath(Collection<Path> paths)
       throws MetadataException {
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Path path : paths) {
-      dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+      dataTypes.add(getSeriesTypeByPath(path));
     }
     return dataTypes;
   }
 
+  public static TSDataType getSeriesTypeByPath(Path path) throws MetadataException {
+    return MManager.getInstance().getSeriesType(path.getFullPath());
+  }
+
   /**
    * @param paths time series paths
    * @param aggregation aggregation function, may be null
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index cbd14d4..9638c45 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import java.util.TreeSet;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -280,7 +279,7 @@ public class MManagerBasicTest {
   }
 
   @Test
-  public void testMaximalSeriesNumberAmongStorageGroup() throws MetadataException, PathException {
+  public void testMaximalSeriesNumberAmongStorageGroup() throws MetadataException {
     MManager manager = MManager.getInstance();
     assertEquals(0, manager.getMaximalSeriesNumberAmongStorageGroups());
     manager.setStorageGroup("root.laptop");
diff --git a/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
index 96af121..2ac6a79 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.executor;
 import java.io.IOException;
 import java.util.ArrayList;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.PathException;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
@@ -49,7 +48,8 @@ public class GroupByEngineDataSetTest {
     groupByTimePlan.setStartTime(startTime);
     groupByTimePlan.setEndTime(endTime);
 
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+        groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
       Pair pair = groupByEngine.nextTimePartition();
@@ -62,7 +62,7 @@ public class GroupByEngineDataSetTest {
   }
 
   @Test
-  public void test2() throws IOException, PathException, StorageEngineException {
+  public void test2() throws IOException, StorageEngineException {
     long queryId = 1000L;
     long unit = 3;
     long slidingStep = 5;
@@ -77,7 +77,8 @@ public class GroupByEngineDataSetTest {
     groupByTimePlan.setSlidingStep(slidingStep);
     groupByTimePlan.setStartTime(startTime);
     groupByTimePlan.setEndTime(endTime);
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+        groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
       Pair pair = groupByEngine.nextTimePartition();
@@ -90,7 +91,7 @@ public class GroupByEngineDataSetTest {
   }
 
   @Test
-  public void test3() throws IOException, PathException, StorageEngineException {
+  public void test3() throws IOException, StorageEngineException {
     long queryId = 1000L;
     long unit = 3;
     long slidingStep = 3;
@@ -105,7 +106,8 @@ public class GroupByEngineDataSetTest {
     groupByTimePlan.setSlidingStep(slidingStep);
     groupByTimePlan.setStartTime(startTime);
     groupByTimePlan.setEndTime(endTime);
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+        groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
       Pair pair = groupByEngine.nextTimePartition();
@@ -118,7 +120,7 @@ public class GroupByEngineDataSetTest {
   }
 
   @Test
-  public void test4() throws IOException, PathException, StorageEngineException {
+  public void test4() throws IOException, StorageEngineException {
     long queryId = 1000L;
     long unit = 3;
     long slidingStep = 3;
@@ -133,7 +135,8 @@ public class GroupByEngineDataSetTest {
     groupByTimePlan.setSlidingStep(slidingStep);
     groupByTimePlan.setStartTime(startTime);
     groupByTimePlan.setEndTime(endTime);
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+        groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
       Pair pair = groupByEngine.nextTimePartition();
@@ -146,7 +149,7 @@ public class GroupByEngineDataSetTest {
   }
 
   @Test
-  public void test5() throws IOException, PathException, StorageEngineException {
+  public void test5() throws IOException, StorageEngineException {
     long queryId = 1000L;
     long unit = 3;
     long slidingStep = 3;
@@ -163,7 +166,8 @@ public class GroupByEngineDataSetTest {
     groupByTimePlan.setEndTime(endTime);
     ArrayList<Object> aggrList = new ArrayList<>();
     aggrList.add(new CountAggrResult());
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+        groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
       Pair pair = groupByEngine.nextTimePartition();
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index a7f920b..d2c4943 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -59,7 +58,7 @@ public class SeriesAggregateReaderTest {
 
 
   @Before
-  public void setUp() throws MetadataException, PathException, IOException, WriteProcessException {
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
     SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 03bf2bf..b879685 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -52,7 +51,7 @@ public class SeriesReaderByTimestampTest {
   private List<TsFileResource> unseqResources = new ArrayList<>();
 
   @Before
-  public void setUp() throws MetadataException, PathException, IOException, WriteProcessException {
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
     SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index 41bca61..861ef47 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.query.reader.series;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -57,7 +56,7 @@ public class SeriesReaderTest {
 
 
   @Before
-  public void setUp() throws MetadataException, PathException, IOException, WriteProcessException {
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
     SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 58177fd..5c5d462 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -60,7 +59,7 @@ public class SeriesReaderTestUtil {
 
   public static void setUp(List<MeasurementSchema> measurementSchemas, List<String> deviceIds,
       List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
-      throws MetadataException, PathException, IOException, WriteProcessException {
+      throws MetadataException, IOException, WriteProcessException {
     MManager.getInstance().init();
     prepareSeries(measurementSchemas, deviceIds);
     prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds);
@@ -150,7 +149,7 @@ public class SeriesReaderTestUtil {
 
 
   private static void prepareSeries(List<MeasurementSchema> measurementSchemas,
-      List<String> deviceIds) throws MetadataException, PathException {
+      List<String> deviceIds) throws MetadataException {
     for (int i = 0; i < measurementNum; i++) {
       measurementSchemas.add(new MeasurementSchema("sensor" + i, TSDataType.INT32,
           encoding, CompressionType.UNCOMPRESSED));
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 08aa3ad..d9d27f8 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -43,6 +43,8 @@ public enum TSStatusCode {
   TSFILE_PROCESSOR_ERROR(314),
   PATH_ILLEGAL(315),
   LOAD_FILE_ERROR(316),
+  INDEX_ERROR(317),
+  UNSUPPORTED_INDEX_TYPE_ERROR(318),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index acec7b4..114707d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -165,6 +165,31 @@ public abstract class Statistics<T> {
     }
   }
 
+  public void update(long time, Object value, TSDataType type) {
+    switch (type) {
+      case INT32:
+        update(time, (Integer) value);
+        break;
+      case INT64:
+        update(time, (Long) value);
+        break;
+      case TEXT:
+        update(time, (Binary) value);
+        break;
+      case BOOLEAN:
+        update(time, (Boolean) value);
+        break;
+      case DOUBLE:
+        update(time, (Double) value);
+        break;
+      case FLOAT:
+        update(time, (Float) value);
+        break;
+      default:
+        throw new UnknownColumnTypeException(type.toString());
+    }
+  }
+
   public void update(long time, boolean value) {
     if (time < this.startTime) {
       startTime = time;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
index 307c694..ba93419 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
@@ -70,7 +70,7 @@ public class TimeFilter {
   public static class TimeIn extends In {
 
     private TimeIn(Set<Long> values, boolean not) {
-      super(values, FilterType.TIME_FILTER,not);
+      super(values, FilterType.TIME_FILTER, not);
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 848e0b3..05ad9db 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -25,8 +25,10 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
 import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
@@ -44,12 +46,15 @@ import org.apache.iotdb.tsfile.utils.StringContainer;
  */
 public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
 
+  private static final long serialVersionUID = -2954441882827172377L;
+
   private String measurementId;
   private TSDataType type;
   private TSEncoding encoding;
   private TSEncodingBuilder encodingConverter;
   private CompressionType compressor;
   private Map<String, String> props = new HashMap<>();
+  private Set<String> indexSet;
 
   public MeasurementSchema() {
   }
@@ -88,6 +93,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     this.encoding = encoding;
     this.props = props == null ? Collections.emptyMap() : props;
     this.compressor = compressionType;
+    this.indexSet = new HashSet<>();
   }
 
   /**
@@ -172,6 +178,18 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     this.props = props;
   }
 
+  public boolean isIndexed(String indexType) {
+    return indexSet.contains(indexType);
+  }
+
+  public void setIndex(String indexType, boolean isIndexed) {
+    if (isIndexed) {
+      indexSet.add(indexType);
+    } else {
+      indexSet.remove(indexType);
+    }
+  }
+
   /**
    * function for getting time encoder.
    */
@@ -183,6 +201,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
 
   /**
    * get Encoder of value from encodingConverter by measurementID and data type.
+   *
    * @return Encoder for value
    */
   public Encoder getValueEncoder() {
@@ -293,5 +312,4 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     sc.addTail("]");
     return sc.toString();
   }
-
 }