You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/06/29 13:24:22 UTC
[incubator-iotdb] 01/01: PISA demo
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch pisa
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit cce4c1acf65d4372482e831873cf2dae7b88edb9
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Mon Jun 29 21:23:53 2020 +0800
PISA demo
---
.../org/apache/iotdb/IndexContractExample.java | 60 ++
.../main/java/org/apache/iotdb/IndexExample.java | 176 ++++++
.../main/java/org/apache/iotdb/JDBCExample.java | 2 +-
.../src/main/java/org/apache/iotdb/Solution.java | 0
.../org/apache/iotdb/db/qp/strategy/SqlBase.g4 | 4 +-
.../IndexException.java} | 22 +-
.../UnSupportedIndexTypeException.java} | 19 +-
.../db/exception/query/QueryProcessException.java | 4 +
.../db/exception/runtime/SQLParserException.java | 9 +-
.../org/apache/iotdb/db/index/FloatDigest.java | 232 ++++++++
.../org/apache/iotdb/db/index/IndexManager.java | 33 ++
.../java/org/apache/iotdb/db/index/LoadData.java | 46 ++
.../java/org/apache/iotdb/db/index/PisaIndex.java | 618 +++++++++++++++++++++
.../org/apache/iotdb/db/index/PisaIndexNode.java | 109 ++++
.../org/apache/iotdb/db/index/PisaIndexTree.java | 63 +++
.../java/org/apache/iotdb/db/index/QueryData.java | 31 ++
.../org/apache/iotdb/db/index/storage/Config.java | 54 ++
.../iotdb/db/index/storage/StorageFactory.java | 43 ++
.../storage/interfaces/IBackendModelCreator.java | 30 +
.../index/storage/interfaces/IBackendReader.java | 29 +
.../index/storage/interfaces/IBackendWriter.java | 23 +
.../db/index/storage/memory/FakeByteStore.java | 140 +++++
.../iotdb/db/index/storage/memory/FakeStore.java | 131 +++++
.../db/index/storage/model/FixWindowPackage.java | 112 ++++
.../model/serializer/DigestSerializeFactory.java | 35 ++
.../storage/model/serializer/DigestSerializer.java | 41 ++
.../serializer/FixWindowPackageSerializer.java | 77 +++
.../model/serializer/FloatDigestSerializer.java | 81 +++
.../iotdb/db/index/utils/DataDigestUtil.java | 125 +++++
.../apache/iotdb/db/index/utils/DigestUtil.java | 109 ++++
.../iotdb/db/index/utils/ForestRootStack.java | 47 ++
.../org/apache/iotdb/db/index/utils/MyBytes.java | 255 +++++++++
.../org/apache/iotdb/db/metadata/MLogWriter.java | 10 +
.../org/apache/iotdb/db/metadata/MManager.java | 164 ++++--
.../iotdb/db/metadata/MetadataOperationType.java | 2 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 140 ++++-
.../iotdb/db/qp/logical/crud/IndexOperator.java | 59 ++
.../iotdb/db/qp/physical/crud/IndexPlan.java | 61 ++
.../iotdb/db/qp/strategy/LogicalGenerator.java | 34 ++
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 6 +
.../org/apache/iotdb/db/utils/SchemaUtils.java | 7 +-
.../iotdb/db/metadata/MManagerBasicTest.java | 3 +-
.../query/executor/GroupByEngineDataSetTest.java | 24 +-
.../reader/series/SeriesAggregateReaderTest.java | 3 +-
.../reader/series/SeriesReaderByTimestampTest.java | 3 +-
.../db/query/reader/series/SeriesReaderTest.java | 3 +-
.../query/reader/series/SeriesReaderTestUtil.java | 5 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 +
.../file/metadata/statistics/Statistics.java | 25 +
.../iotdb/tsfile/read/filter/TimeFilter.java | 2 +-
.../tsfile/write/schema/MeasurementSchema.java | 20 +-
51 files changed, 3210 insertions(+), 123 deletions(-)
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/IndexContractExample.java b/example/jdbc/src/main/java/org/apache/iotdb/IndexContractExample.java
new file mode 100644
index 0000000..e6f6411
--- /dev/null
+++ b/example/jdbc/src/main/java/org/apache/iotdb/IndexContractExample.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import static org.apache.iotdb.JDBCExample.outputResult;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class IndexContractExample {
+
+ public static void main(String[] args) throws ClassNotFoundException, SQLException, IOException {
+ Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+ try (Connection connection = DriverManager
+ .getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+// insertData(statement);
+
+ long startTime = System.nanoTime();
+ ResultSet resultSet = statement.executeQuery("SELECT sum(*) FROM root WHERE time < 365000");
+ System.out.println((System.nanoTime() - startTime) / 1000000);
+ outputResult(resultSet);
+
+ statement.execute("CREATE INDEX on root.vehicle.d0.s0 USING pisa");
+ statement.execute("DROP INDEX pisa ON root.vehicle.d0.s0");
+ }
+ }
+
+ private static void insertData(Statement statement) throws SQLException {
+ // insert large amount of data time range : 0 ~ 730000
+ for (int time = 0; time < 730000; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ if (time % 667 == 1) {
+ statement.execute("FLUSH");
+ }
+ }
+ }
+}
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/IndexExample.java b/example/jdbc/src/main/java/org/apache/iotdb/IndexExample.java
new file mode 100644
index 0000000..373a9c1
--- /dev/null
+++ b/example/jdbc/src/main/java/org/apache/iotdb/IndexExample.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import static org.apache.iotdb.JDBCExample.outputResult;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class IndexExample {
+
+ public static String[] create_sql = new String[]{"SET STORAGE GROUP TO root.vehicle",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s5 WITH DATATYPE=DOUBLE, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+ };
+
+ private static String[] stringValue = new String[]{"A", "B", "C", "D", "E"};
+ private static String[] booleanValue = new String[]{"true", "false"};
+
+ public static void main(String[] args) throws ClassNotFoundException, SQLException {
+ Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+ try (Connection connection = DriverManager
+ .getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+// insertData(statement);
+
+ ResultSet resultSet = statement.executeQuery("SELECT * FROM root WHERE time < 10");
+ outputResult(resultSet);
+
+ statement.execute("CREATE INDEX on root.vehicle.d0.s0 USING pisa");
+ statement.execute("DROP INDEX pisa ON root.vehicle.d0.s0");
+ }
+ }
+
+ private static void insertData(Statement statement) throws SQLException {
+ // insert large amount of data time range : 3000 ~ 13600
+ for (int time = 3000; time < 13600; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 100);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 17);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 22);
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time,
+ stringValue[time % 5]);
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", time,
+ booleanValue[time % 2]);
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s5) values(%s, %s)", time, time);
+ statement.execute(sql);
+ }
+
+ for (int time = 13700; time < 24000; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 123);
+ statement.execute(sql);
+ }
+
+ statement.execute("merge");
+
+ // buffwrite data, unsealed file
+ for (int time = 100000; time < 101000; time++) {
+
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 20);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 30);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 77);
+ statement.execute(sql);
+ }
+
+ // sequential data, memory data
+ for (int time = 200000; time < 201000; time++) {
+
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, -time % 20);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, -time % 30);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, -time % 77);
+ statement.execute(sql);
+ }
+
+ statement.execute("FLUSH");
+ // unsequence insert, time < 3000
+ for (int time = 2000; time < 2500; time++) {
+
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time + 1);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time + 2);
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time,
+ stringValue[time % 5]);
+ statement.execute(sql);
+ }
+
+ for (int time = 100000; time < 100500; time++) {
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 666);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, 777);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, 888);
+ statement.execute(sql);
+ }
+
+ statement.execute("FLUSH");
+ // unsequence insert, time > 200000
+ for (int time = 200900; time < 201000; time++) {
+
+ String sql = String
+ .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 6666);
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, 7777);
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, 8888);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, "goodman");
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", time,
+ booleanValue[time % 2]);
+ statement.execute(sql);
+ sql = String.format("insert into root.vehicle.d0(timestamp,s5) values(%s, %s)", time, 9999);
+ statement.execute(sql);
+ }
+ }
+
+}
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
index 00f1084..8380b89 100644
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
@@ -58,7 +58,7 @@ public class JDBCExample {
}
}
- private static void outputResult(ResultSet resultSet) throws SQLException {
+ static void outputResult(ResultSet resultSet) throws SQLException {
if (resultSet != null) {
System.out.println("--------------------------");
final ResultSetMetaData metaData = resultSet.getMetaData();
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/Solution.java b/example/jdbc/src/main/java/org/apache/iotdb/Solution.java
new file mode 100644
index 0000000..e69de29
diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
index e6d6329..cd64892 100644
--- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
+++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
@@ -34,8 +34,8 @@ statement
| DELETE STORAGE GROUP fullPath (COMMA fullPath)* #deleteStorageGroup
| SHOW METADATA #showMetadata // not support yet
| DESCRIBE prefixPath #describePath // not support yet
- | CREATE INDEX ON fullPath USING function=ID indexWithClause? whereClause? #createIndex //not support yet
- | DROP INDEX function=ID ON fullPath #dropIndex //not support yet
+ | CREATE INDEX ON fullPath USING function=ID #createIndex // indexWithClause? whereClause?
+ | DROP INDEX function=ID ON fullPath #dropIndex
| MERGE #merge
| FLUSH prefixPath? (COMMA prefixPath)* (booleanClause)?#flush
| FULL MERGE #fullMerge
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/IndexException.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
copy to server/src/main/java/org/apache/iotdb/db/exception/index/IndexException.java
index f8c007c..10f4ae2 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/IndexException.java
@@ -16,25 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.db.exception.index;
-package org.apache.iotdb.db.exception.query;
-
-import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.rpc.TSStatusCode;
-public class QueryProcessException extends IoTDBException {
+public class IndexException extends QueryProcessException {
+
- private static final long serialVersionUID = -683191083844850054L;
+ private static final long serialVersionUID = 2585920847533339136L;
- public QueryProcessException(String message) {
- super(message, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ public IndexException(Throwable cause) {
+ super(cause, TSStatusCode.INDEX_ERROR.getStatusCode());
}
- public QueryProcessException(String message, int errorCode) {
- super(message, errorCode);
+ public IndexException(String message) {
+ super(message, TSStatusCode.INDEX_ERROR.getStatusCode());
}
- public QueryProcessException(IoTDBException e) {
- super(e, e.getErrorCode());
+ public IndexException(String message, int errorCode) {
+ super(message, errorCode);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/PathException.java b/server/src/main/java/org/apache/iotdb/db/exception/index/UnSupportedIndexTypeException.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/exception/query/PathException.java
rename to server/src/main/java/org/apache/iotdb/db/exception/index/UnSupportedIndexTypeException.java
index 5fd792b..f96c087 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/PathException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/index/UnSupportedIndexTypeException.java
@@ -16,23 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.exception.query;
+package org.apache.iotdb.db.exception.index;
import org.apache.iotdb.rpc.TSStatusCode;
-public class PathException extends QueryProcessException {
+public class UnSupportedIndexTypeException extends IndexException {
- private static final long serialVersionUID = 2141197032898163234L;
+ private static final long serialVersionUID = 4967425512171623007L;
- public PathException() {
- super("Timeseries is null", TSStatusCode.PATH_ERROR.getStatusCode());
- }
-
- public PathException(String message) {
- super(message, TSStatusCode.PATH_ERROR.getStatusCode());
- }
-
- public PathException(String message, int errorCode) {
- super(message, errorCode);
+ public UnSupportedIndexTypeException(String indexType) {
+ super(String.format("Unsupported index type: [%s]", indexType),
+ TSStatusCode.UNSUPPORTED_INDEX_TYPE_ERROR.getStatusCode());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
index f8c007c..6784634 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
@@ -37,4 +37,8 @@ public class QueryProcessException extends IoTDBException {
public QueryProcessException(IoTDBException e) {
super(e, e.getErrorCode());
}
+
+ public QueryProcessException(Throwable cause, int errorCode) {
+ super(cause, errorCode);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java b/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java
index f0d0d67..f0658d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/runtime/SQLParserException.java
@@ -18,15 +18,22 @@
*/
package org.apache.iotdb.db.exception.runtime;
-public class SQLParserException extends RuntimeException{
+public class SQLParserException extends RuntimeException {
+
private static final long serialVersionUID = 3249707655860110299L;
+
public SQLParserException() {
super("Error format in SQL statement, please check whether SQL statement is correct.");
}
+
public SQLParserException(String message) {
super(message);
}
+ public SQLParserException(Throwable cause) {
+ super(cause);
+ }
+
public SQLParserException(String type, String message) {
super(String.format("Unsupported type: [%s]. " + message, type));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/FloatDigest.java b/server/src/main/java/org/apache/iotdb/db/index/FloatDigest.java
new file mode 100644
index 0000000..70dbadc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/FloatDigest.java
@@ -0,0 +1,232 @@
+package org.apache.iotdb.db.index;
+
+import java.math.BigDecimal;
+
+/**
+ * Float's digest has five property: amx,min.average,square of average and starttime
+ *
+ * @author zhangjinrui
+ */
+public class FloatDigest {
+
+ private String key = "";
+ private long startTime = -1L; // -code
+ private long timeWindow = -1L;
+ private long code = -1L;
+ private long serial = -1L;
+
+ public long parent = -1L;
+
+ private float max = 0;
+ private float min = 0;
+ private float avg = 0;
+ private long count = 0;
+ private BigDecimal squareSum = new BigDecimal(0.0);
+ private boolean isEmpty = false;
+
+ public void setEmpty(boolean empty) {
+ isEmpty = empty;
+ }
+
+ public boolean isEmpty() {
+ return isEmpty;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + Float.floatToIntBits(avg);
+ result = prime * result + (int) (count ^ (count >>> 32));
+ result = prime * result + Float.floatToIntBits(max);
+ result = prime * result + Float.floatToIntBits(min);
+ result = prime * result + ((squareSum == null) ? 0 : squareSum.hashCode());
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "FloatDigest [key=" + key + ", startTime=" + startTime + ", timeWindow=" + timeWindow
+ + ", code=" + code + ", parent=" + parent
+ + ", serial=" + serial + ", max=" + max + ", min=" + min + ", avg=" + avg + ", count="
+ + count
+ + ", squareSum=" + squareSum + "]";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!super.equals(obj)) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ FloatDigest other = (FloatDigest) obj;
+ if (Float.floatToIntBits(avg) != Float.floatToIntBits(other.avg)) {
+ return false;
+ }
+ if (count != other.count) {
+ return false;
+ }
+ if (Float.floatToIntBits(max) != Float.floatToIntBits(other.max)) {
+ return false;
+ }
+ if (Float.floatToIntBits(min) != Float.floatToIntBits(other.min)) {
+ return false;
+ }
+ if (squareSum == null) {
+ if (other.squareSum != null) {
+ return false;
+ }
+ } else if (!squareSum.equals(other.squareSum)) {
+ return false;
+ }
+ return true;
+ }
+
+ public float getMax() {
+ return max;
+ }
+
+ public void setMax(float max) {
+ this.max = max;
+ }
+
+ public float getMin() {
+ return min;
+ }
+
+ public void setMin(float min) {
+ this.min = min;
+ }
+
+ public float getAvg() {
+ return avg;
+ }
+
+ public void setAvg(float avg) {
+ this.avg = avg;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(long count) {
+ this.count = count;
+ }
+
+ public BigDecimal getSquareSum() {
+ return squareSum;
+ }
+
+ public void setSquareSum(BigDecimal squareSum) {
+ this.squareSum = squareSum;
+ }
+
+ public FloatDigest() {
+ super();
+ }
+
+ public FloatDigest(boolean isEmpty) {
+ this.isEmpty = isEmpty;
+ }
+
+ public FloatDigest(String key, long startTime, long timeWindow, long code, long serial) {
+ this.key = key;
+ this.startTime = startTime;
+ this.timeWindow = timeWindow;
+ this.code = code;
+ this.serial = serial;
+ }
+
+ /**
+ * 经过merge产生的中间节点的 startTime 都是 -code
+ */
+ public FloatDigest(FloatDigest left, FloatDigest right) {
+ this.key = left.key;
+ this.code = right.code + 1;
+ if (left.isEmpty && right.isEmpty) {
+ isEmpty = true;
+ return;
+ }
+ this.startTime = -this.code;
+ this.timeWindow = left.timeWindow + right.timeWindow;
+
+ max = (left.max > right.max) ? left.max : right.max;
+ min = (left.min < right.min) ? left.min : right.min;
+ count = left.count + right.count;
+ avg = (left.avg / (left.count + right.count)) * left.count
+ + (right.avg / (left.count + right.count))
+ * right.count;
+ squareSum = left.squareSum.add(right.squareSum);
+
+ // mark for MPISA
+ left.parent = this.code;
+ right.parent = this.code;
+ }
+
+ public FloatDigest(String key, long startTime, long timeWindow, float max, float min,
+ long count, float avg, BigDecimal squareSum) {
+ this.key = key;
+ this.startTime = startTime;
+ this.timeWindow = timeWindow;
+ this.max = max;
+ this.min = min;
+ this.count = count;
+ this.avg = avg;
+ this.squareSum = squareSum;
+ }
+
+ public FloatDigest generateParent(FloatDigest a, FloatDigest b) {
+ return new FloatDigest(a, b);
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getTimeWindow() {
+ return timeWindow;
+ }
+
+ public long getEndTime() {
+ return startTime + timeWindow;
+ }
+
+ public void setTimeWindow(long timeWindow) {
+ this.timeWindow = timeWindow;
+ }
+
+ public long getCode() {
+ return code;
+ }
+
+ public void setCode(long code) {
+ this.code = code;
+ }
+
+ public long getSerial() {
+ return serial;
+ }
+
+ public void setSerial(long serial) {
+ this.serial = serial;
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
new file mode 100644
index 0000000..06d88d2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.db.index;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.db.exception.index.UnSupportedIndexTypeException;
+
+public class IndexManager {
+
+ private static Map<IndexType, PisaIndex> indexMap = new HashMap<>();
+
+
+ static {
+ indexMap.put(IndexType.PISAIndex, PisaIndex.getInstance());
+ }
+
+ public static PisaIndex getIndexInstance(IndexType indexType) {
+ return indexMap.get(indexType);
+ }
+
+ public enum IndexType {
+ PISAIndex;
+
+ public static IndexType getIndexType(String indexType) throws UnSupportedIndexTypeException {
+ String normalized = indexType.toLowerCase();
+ switch (normalized) {
+ case "pisa":
+ return PISAIndex;
+ default:
+ throw new UnSupportedIndexTypeException(indexType);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/LoadData.java b/server/src/main/java/org/apache/iotdb/db/index/LoadData.java
new file mode 100644
index 0000000..34216b9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/LoadData.java
@@ -0,0 +1,46 @@
+package org.apache.iotdb.db.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.iotdb.db.index.storage.Config;
+import org.apache.iotdb.db.index.storage.StorageFactory;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class LoadData {
+
+ public static void main(String[] args) throws Exception {
+ List<String> cfs = new ArrayList<>();
+ cfs.add(Config.digest_cf);
+ cfs.add(Config.data_cf);
+ IBackendModelCreator schemaCreator = StorageFactory.getBackendModelCreator();
+ schemaCreator.initialize("pisa", 1, cfs);
+
+ loadPisa(Config.timeSeriesName, (long) Math.pow(2, Integer.valueOf(Config.totals)));
+ System.exit(1);
+
+ }
+
+ /**
+ * @param key time series name
+ * @param total total points
+ */
+ private static void loadPisa(String key, long total) throws Exception {
+ PisaIndex<FloatDigest> pisaIndex = new PisaIndex<>();
+ //time window in ms
+ Random random = new Random();
+ long currentTime = 0;
+ for (long i = 0; i < total; i++) {
+ if (i % 100 == 0) {
+ System.out.println("current progress:" + i);
+ }
+ Pair<Long, Float> data = new Pair<>(currentTime++, random.nextFloat());
+ pisaIndex.insertPoint(data);
+
+ }
+
+ pisaIndex.close();
+ System.out.println("total data points:" + total);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/PisaIndex.java b/server/src/main/java/org/apache/iotdb/db/index/PisaIndex.java
new file mode 100644
index 0000000..29ca483
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/PisaIndex.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.storage.Config;
+import org.apache.iotdb.db.index.storage.StorageFactory;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.db.index.utils.DataDigestUtil;
+import org.apache.iotdb.db.index.utils.DigestUtil;
+import org.apache.iotdb.db.index.utils.ForestRootStack;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PisaIndex<T extends FloatDigest> {
+
+ private Logger logger = LoggerFactory.getLogger(PisaIndex.class);
+ private long maxSerialNo = 0;
+
+ private ForestRootStack<T> rootNodes;
+ protected IBackendReader reader = StorageFactory.getBackaBackendReader();
+ protected IBackendWriter writer = StorageFactory.getBackaBackendWriter();
+ private String rowkey;
+
+ private Pair<Long, Long> currentWindow = new Pair<>(0L, Config.timeWindow);
+ private FixWindowPackage pkg;
+
+
+ private FixWindowPackage leftPackage;
+ private FixWindowPackage rightPackage;
+
+ private static final String INDEX_FILE_PATH =
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0] + File.separator
+ + "index" + File.separator + "test.tsfile";
+
+ public static PisaIndex getInstance() {
+ return PisaIndexHolder.INSTANCE;
+ }
+
+ public boolean build(Path path) throws IndexException {
+ // TODO initial
+ this.rowkey = path.toString();
+ pkg = new FixWindowPackage(rowkey, currentWindow);
+
+ T lastDigest = getLastDigest();
+
+ if (null == lastDigest) {
+ maxSerialNo = 0;
+ rootNodes = new ForestRootStack<T>();
+ } else {
+ maxSerialNo = lastDigest.getSerial();
+ rootNodes = getRoots(lastDigest);
+ }
+
+ // TODO new codes
+ long queryId = QueryResourceManager.getInstance().assignQueryId(true);
+ QueryContext context = new QueryContext(queryId);
+ Map<Long, Statistics> timePartitionMap = new HashMap<>();
+
+ try {
+ TSDataType dataType = SchemaUtils.getSeriesTypeByPath(path);
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(path, context, null);
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add(path.getMeasurement());
+ SeriesAggregateReader seriesReader = new SeriesAggregateReader(path, allSensors, dataType,
+ context,
+ queryDataSource, null, null, null);
+
+ while (seriesReader.hasNextChunk()) {
+ if (seriesReader.canUseCurrentChunkStatistics()) {
+ Statistics chunkStatistics = seriesReader.currentChunkStatistics();
+ updateStatistics(timePartitionMap, chunkStatistics);
+ seriesReader.skipCurrentChunk();
+ continue;
+ }
+ while (seriesReader.hasNextPage()) {
+ if (seriesReader.canUseCurrentPageStatistics()) {
+ Statistics pageStatistic = seriesReader.currentPageStatistics();
+ updateStatistics(timePartitionMap, pageStatistic);
+ seriesReader.skipCurrentPage();
+ continue;
+ }
+
+ while (seriesReader.hasNextPage()) {
+ BatchData nextOverlappedPageData = seriesReader.nextPage();
+ while (nextOverlappedPageData.hasCurrent()) {
+ updateStatisticsFromPage(timePartitionMap, nextOverlappedPageData, dataType);
+ nextOverlappedPageData.next();
+ }
+ nextOverlappedPageData.resetBatchData();
+ }
+ }
+ }
+ Set<PisaIndexNode> indexNodeSet = new TreeSet<>(
+ Comparator.comparingLong(PisaIndexNode::getNodeNumber));
+ for (Map.Entry<Long, Statistics> entry : timePartitionMap.entrySet()) {
+ PisaIndexNode node = new PisaIndexNode(entry.getKey(), entry.getValue());
+ indexNodeSet.add(node);
+ PisaIndexNode parentNode = mergeAndGenerateParent(indexNodeSet, node, dataType);
+ while (parentNode != null) {
+ indexNodeSet.add(parentNode);
+ parentNode = mergeAndGenerateParent(indexNodeSet, parentNode, dataType);
+ }
+ }
+
+ // TODO store PISA
+ File f = FSFactoryProducer.getFSFactory().getFile(INDEX_FILE_PATH);
+ if (f.exists()) {
+ f.delete();
+ }
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
+ tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "min"),
+ new MeasurementSchema("max", dataType, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "max"),
+ new MeasurementSchema("max", dataType, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "first"),
+ new MeasurementSchema("first", dataType, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "last"),
+ new MeasurementSchema("last", dataType, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(path.getFullPath(), "sum"),
+ new MeasurementSchema("sum", TSDataType.DOUBLE, TSEncoding.RLE));
+
+ for (int i = 0; i < 10000; i++) {
+ for (PisaIndexNode node : indexNodeSet) {
+ TSRecord tsRecord = new TSRecord(node.getNodeNumber(), path.getFullPath() + i);
+ tsRecord.addTuple(DataPoint
+ .getDataPoint(dataType, "min", String.valueOf(node.getStatistics().getMinValue())));
+ tsRecord.addTuple(DataPoint
+ .getDataPoint(dataType, "max", String.valueOf(node.getStatistics().getMaxValue())));
+ tsRecord.addTuple(DataPoint
+ .getDataPoint(dataType, "first",
+ String.valueOf(node.getStatistics().getFirstValue())));
+ tsRecord.addTuple(DataPoint
+ .getDataPoint(dataType, "last", String.valueOf(node.getStatistics().getLastValue())));
+ tsRecord.addTuple(DataPoint
+ .getDataPoint(TSDataType.DOUBLE, "sum",
+ String.valueOf(node.getStatistics().getSumValue())));
+
+ // write TSRecord
+ tsFileWriter.write(tsRecord);
+ }
+ }
+ tsFileWriter.close();
+
+ // TODO for experiment
+ // create reader and get the readTsFile interface
+ Path sumPath = new Path(path + "0.sum");
+ TsFileSequenceReader reader = new TsFileSequenceReader(INDEX_FILE_PATH);
+ ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
+
+ long startTime = System.nanoTime();
+ Set<Long> queryNodes = getQueryNodes(1, 26);
+ GlobalTimeExpression expression = new GlobalTimeExpression(TimeFilter.in(queryNodes, false));
+
+ List<Path> paths = new ArrayList<>();
+ paths.add(sumPath);
+ QueryExpression queryExpression = QueryExpression.create(paths, expression);
+ QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+ double sum = 0.0;
+ while (queryDataSet.hasNext()) {
+ sum += queryDataSet.next().getFields().get(0).getDoubleV();
+ }
+
+ logger.info("queryNodes: {}, sum: {}, cost time: {}", queryNodes, sum,
+ (System.nanoTime() - startTime) / 100000);
+ } catch (MetadataException | StorageEngineException | IOException | WriteProcessException | QueryProcessException e) {
+ throw new IndexException(e);
+ }
+ return true;
+ }
+
+ private void updateStatistics(Map<Long, Statistics> timePartitionMap, Statistics statistics) {
+ long timePartitionId = fromTimeToTimePartition(statistics.getStartTime()) + 1;
+ if (timePartitionMap.containsKey(timePartitionId)) {
+ timePartitionMap.get(timePartitionId).mergeStatistics(statistics);
+ } else {
+ timePartitionMap.put(timePartitionId, statistics);
+ }
+ }
+
+ private void updateStatisticsFromPage(Map<Long, Statistics> timePartitionMap,
+ BatchData dataInThisPage, TSDataType dataType) {
+ long timePartitionId = fromTimeToTimePartition(dataInThisPage.currentTime());
+ Statistics statistics;
+ if (timePartitionMap.containsKey(timePartitionId)) {
+ statistics = timePartitionMap.get(timePartitionId);
+ } else {
+ statistics = Statistics.getStatsByType(dataType);
+ }
+ statistics.update(dataInThisPage.currentTime(), dataInThisPage.currentValue(), dataType);
+ timePartitionMap.put(timePartitionId, statistics);
+ }
+
+ private long fromTimeToTimePartition(long time) {
+ return time / IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ }
+
+ private PisaIndexNode mergeAndGenerateParent(Set<PisaIndexNode> indexNodeSet,
+ PisaIndexNode node, TSDataType dataType) {
+ long nodeNumber = node.getNodeNumber();
+ int level = node.getLevel();
+ int bias = (int) Math.pow(2, level) - 1;
+ Statistics statistics = Statistics.getStatsByType(dataType);
+ long parentNodeNumber = 0;
+ for (PisaIndexNode indexNode : indexNodeSet) {
+ if (!indexNode.isInMem() || indexNode.getLevel() != level) {
+ continue;
+ }
+ if (indexNode.getNodeNumber() == nodeNumber - bias) {
+ statistics.mergeStatistics(indexNode.getStatistics());
+ parentNodeNumber = nodeNumber + 1;
+ indexNode.setInMem(false);
+ break;
+ } else if (indexNode.getNodeNumber() == nodeNumber + bias) {
+ statistics.mergeStatistics(indexNode.getStatistics());
+ parentNodeNumber = nodeNumber + bias + 1;
+ indexNode.setInMem(false);
+ break;
+ }
+ }
+ if (statistics.isEmpty()) {
+ return null;
+ }
+ node.setInMem(false);
+ statistics.mergeStatistics(node.getStatistics());
+ return new PisaIndexNode(parentNodeNumber, level + 1, statistics);
+ }
+
+ public boolean drop(Path path) {
+ return true;
+ }
+
+ private T getLastDigest() {
+ T lastDigest = null;
+ try {
+ System.out.println("rowkey:" + rowkey);
+ lastDigest = (T) reader.getLatestDigest(rowkey);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(String.format("Read lastest digest error : (%s)", rowkey));
+ }
+ return lastDigest;
+ }
+
+
+ public void insertPoint(Pair<Long, Float> data) throws Exception {
+ if (pkg.cover(data.left)) {
+ pkg.add(data);
+ } else {
+ if (!pkg.isEmpty()) {
+ writer.write(rowkey, Config.data_cf, pkg.getStartTime(), pkg);
+ }
+
+ // insert middle packages
+ while (data.left >= currentWindow.right) {
+ FloatDigest digest = pkg.getDigest();
+ insert((T) digest);
+ currentWindow = new Pair<>(currentWindow.right, currentWindow.right + Config.timeWindow);
+ pkg = new FixWindowPackage(rowkey, currentWindow);
+ }
+ pkg.add(data);
+ }
+ }
+
+ public void close() throws Exception {
+ if (!pkg.isEmpty()) {
+ writer.write(rowkey, Config.data_cf, pkg.getStartTime(), pkg);
+ FloatDigest digest = pkg.getDigest();
+ insert((T) digest);
+ }
+ }
+
+ protected ForestRootStack<T> getRoots(T digest) {
+ ForestRootStack<T> rootStack = new ForestRootStack<T>();
+
+ List<Long> rootCodes = getRootCodes(maxSerialNo);
+ Long[] codeArray = rootCodes.toArray(new Long[]{});
+ if (codeArray.length != 0) {
+ T[] rootArray = (T[]) reader.getDigests(rowkey, codeArray);
+
+ for (int i = rootArray.length - 1; i >= 0; --i) {
+ rootStack.push(rootArray[i]);
+ }
+ }
+ if (digest.getSerial() % 2 == 1) {
+ rootStack.push(digest);
+ }
+
+ return rootStack;
+ }
+
+ //get all root codes in memory by a serial number
+ protected List<Long> getRootCodes(long serialNo) {
+ List<Long> rootCodes = new ArrayList<Long>();
+ long tmpUpperSerial = serialNo;
+
+ if (tmpUpperSerial % 2 == 1) {
+ tmpUpperSerial--;
+ }
+
+ while (tmpUpperSerial > 1) {
+ long rightestCode = DigestUtil.serialToCode(tmpUpperSerial);
+ long parentCode = DigestUtil.getRootCodeBySerialNum(tmpUpperSerial);
+
+ //why negative??????
+ rootCodes.add(-parentCode);
+
+ long leftestCode = DigestUtil.getLeftestCode(parentCode, rightestCode);
+ tmpUpperSerial = DigestUtil.codeToSerial(leftestCode) - 1;
+ }
+
+ return rootCodes;
+ }
+
+ public int insert(T digest) throws Exception {
+ ++maxSerialNo;
+ digest.setSerial(maxSerialNo);
+
+ long code = DigestUtil.serialToCode(maxSerialNo);
+ digest.setCode(code);
+
+ //digest nodes to be persistent
+ List<T> digests = new ArrayList<>();
+ rootNodes.push(digest);
+ //first add the leaf node of digest forest
+ digests.add(digest);
+
+ logger.debug("Insert Node code number : " + code);
+ if (maxSerialNo % 2 == 0) {
+ long parentCode = DigestUtil.getRootCodeBySerialNum(maxSerialNo);
+ //new generated internal nodes
+ List<T> parentNodes = generateParents(code + 1, parentCode);
+ digests.addAll(parentNodes);
+ }
+ for (T digestNode : digests) {
+ //flush all nodes in digests into disk
+ writer.write(rowkey, Config.digest_cf, digestNode.getStartTime(),
+ digestNode);
+ }
+ return digests.size();
+ }
+
+ private List<T> generateParents(long lowCodeNum, long upCodeNum) {
+ List<T> digests = new ArrayList<>();
+
+ for (long count = lowCodeNum; count <= upCodeNum; ++count) {
+ Pair<T, T> children = rootNodes.popPair();
+ T parentNode = generateParent(children.left, children.right);
+ logger.debug("Generate parent code number : " + parentNode.getCode());
+ rootNodes.push(parentNode);
+ digests.add(parentNode);
+ }
+
+ return digests;
+ }
+
+ @SuppressWarnings("unchecked")
+ private T generateParent(T a, T b) {
+ return (T) a.generateParent(a, b);
+ }
+
+ public void clean() {
+ this.leftPackage = null;
+ this.rightPackage = null;
+ }
+
+ /**
+ * return how many nodes we will read
+ */
+ public int queryPlan(Pair<Long, Long> range) {
+ leftPackage = reader.getBeforeOrEqualPackage(rowkey, range.left);
+ rightPackage = reader.getBeforeOrEqualPackage(rowkey, range.right);
+ // No packages.
+ // xxx xxx left xxx right xxx xxx
+ if (leftPackage == null && rightPackage == null) {
+ return 0;
+ }
+
+ // 1 packages.
+ // xxx xxx left ... right xxx xxx
+ else if (leftPackage != null && rightPackage != null
+ && leftPackage.getStartTime() == rightPackage.getStartTime()) {
+ return 1;
+ }
+ List<T> digests = new ArrayList<>();
+
+ // query left bound package.
+ long leftStartTime = leftPackage.getStartTime();
+ T leftDigest = (T) leftPackage
+ .getDigest(range.left, leftStartTime + leftPackage.getTimeWindow());
+ if (null != leftDigest) {
+ digests.add(leftDigest);
+ }
+
+ // query right bound package.
+ long rightStartTime = rightPackage.getStartTime();
+ T rightDigest = (T) rightPackage.getDigest(rightStartTime, range.right);
+ if (null != rightDigest) {
+ digests.add(rightDigest);
+ }
+
+ T leftAfterDigest = (T) reader.getAfterOrEqualDigest(rowkey, leftStartTime + 1);
+ T rightBeforeDigest = (T) reader.getBeforeOrEqualDigest(rowkey, rightStartTime - 1);
+
+ long downSerial = leftAfterDigest.getSerial();
+ long upSerial = rightBeforeDigest.getSerial();
+
+ if (downSerial > upSerial) {
+ return digests.size();
+ }
+
+ logger.debug("Query range(Serial Number) : " + downSerial + "-" + upSerial);
+ Pair<Long, Long> bounds = new Pair<>(downSerial, upSerial);
+ if (downSerial % 2 == 0) {
+ digests.add(leftAfterDigest);
+ bounds.left = bounds.left + 1;
+ }
+ if (upSerial % 2 == 1) {
+ digests.add(rightBeforeDigest);
+ bounds.right = bounds.right - 1;
+ }
+ if (bounds.left < bounds.right) {
+ T[] parentArray = queryBetween(bounds);
+ List<T> parentNodes = Arrays.asList(parentArray);
+ digests.addAll(parentNodes);
+ }
+ return digests.size();
+ }
+
+ public T query(Pair<Long, Long> range) {
+ if (leftPackage == null) {
+ leftPackage = reader.getBeforeOrEqualPackage(rowkey, range.left);
+ }
+ if (rightPackage == null) {
+ rightPackage = reader.getBeforeOrEqualPackage(rowkey, range.right);
+ }
+
+ // No packages.
+ // xxx xxx left xxx right xxx xxx
+ if (leftPackage == null && rightPackage == null) {
+ return null;
+ }
+
+ // 1 packages.
+ // xxx xxx left ... right xxx xxx
+ else if (leftPackage != null && rightPackage != null
+ && leftPackage.getStartTime() == rightPackage.getStartTime()) {
+ return (T) leftPackage.getDigest(range.left, range.right);
+ }
+
+ List<T> digests = new ArrayList<>();
+
+ // query left bound package.
+ long leftStartTime = leftPackage.getStartTime();
+ T leftDigest = (T) leftPackage
+ .getDigest(range.left, leftStartTime + leftPackage.getTimeWindow());
+ if (null != leftDigest) {
+ digests.add(leftDigest);
+ }
+
+ // query right bound package.
+ long rightStartTime = rightPackage.getStartTime();
+ T rightDigest = (T) rightPackage.getDigest(rightStartTime, range.right);
+ if (null != rightDigest) {
+ digests.add(rightDigest);
+ }
+
+ long time = System.currentTimeMillis();
+ T leftAfterDigest = (T) reader.getAfterOrEqualDigest(rowkey, leftStartTime + 1);
+ T rightBeforeDigest = (T) reader.getBeforeOrEqualDigest(rowkey, rightStartTime - 1);
+ System.out.println("this step time cost:" + (System.currentTimeMillis() - time));
+ long downSerial = leftAfterDigest.getSerial();
+ long upSerial = rightBeforeDigest.getSerial();
+
+ if (downSerial > upSerial) {
+ return (T) DataDigestUtil.aggregate(rowkey, range.left,
+ (digests.toArray(new FloatDigest[]{})));
+ }
+
+ System.out.println("Query range(Serial Number) : " + downSerial + "-" + upSerial);
+ Pair<Long, Long> bounds = new Pair<>(downSerial, upSerial);
+ if (downSerial % 2 == 0) {
+ digests.add(leftAfterDigest);
+ bounds.left = bounds.left + 1;
+ }
+ if (upSerial % 2 == 1) {
+ digests.add(rightBeforeDigest);
+ bounds.right = bounds.right - 1;
+ }
+ if (bounds.left < bounds.right) {
+ T[] parentArray = queryBetween(bounds);
+ List<T> parentNodes = Arrays.asList(parentArray);
+ digests.addAll(parentNodes);
+ }
+
+ T result = (T) DataDigestUtil.aggregate(rowkey, range.left,
+ (digests.toArray(new FloatDigest[]{})));
+ this.clean();
+ return result;
+ }
+
+ public T[] queryBetween(Pair<Long, Long> range) {
+ long lowerSerial = range.left; // leftLeafNumber
+ long upperSerial = range.right; // rightLeafNumber
+
+ long upperCode = DigestUtil.serialToCode(upperSerial); // rightNodeNumber
+ long parentCode = DigestUtil.getRootCodeBySerialNum(upperSerial); // rootNodeNumber
+ long leftCode = DigestUtil.getLeftestCode(parentCode, upperCode); // leftestNodeNumber
+ long leftSerial = DigestUtil.codeToSerial(leftCode); // leftestLeafNumber
+
+ List<Long> parentCodes = new ArrayList<>();
+ while (leftSerial != lowerSerial) {
+ if (leftSerial > lowerSerial) {
+ parentCodes.add(-parentCode);
+ upperSerial = leftSerial - 1;
+ upperCode = DigestUtil.serialToCode(upperSerial);
+ parentCode = DigestUtil.getRootCodeBySerialNum(upperSerial);
+ } else {
+ --parentCode;
+ }
+ leftCode = DigestUtil.getLeftestCode(parentCode, upperCode);
+ leftSerial = DigestUtil.codeToSerial(leftCode);
+ }
+
+ /*
+ * 将负的 code number 当做时间戳加到 candidate set 中
+ */
+ parentCodes.add(-parentCode);
+ Long[] codeArray = parentCodes.toArray(new Long[0]);
+
+ long start = System.currentTimeMillis();
+ System.out.println("query nodes size: " + codeArray.length);
+ T[] parentDigests = (T[]) reader.getDigests(rowkey, codeArray);
+ System.out.println(System.currentTimeMillis() - start);
+
+ return parentDigests;
+ }
+
+ private Set<Long> getQueryNodes(long leftLeafNumber, long rightLeafNumber) {
+ long leftestLeafNumber = PisaIndexNode.getLeftestLeafNumber(rightLeafNumber);
+ long rootNodeNumber = PisaIndexNode.getRootNodeNumber(rightLeafNumber);
+
+ Set<Long> parentNodes = new HashSet<>();
+ while (leftestLeafNumber >= leftLeafNumber) {
+ parentNodes.add(rootNodeNumber);
+ if (leftestLeafNumber == leftLeafNumber) {
+ break;
+ }
+ rightLeafNumber = leftestLeafNumber - 1;
+ rootNodeNumber = DigestUtil.getRootCodeBySerialNum(rightLeafNumber);
+ leftestLeafNumber = PisaIndexNode.getLeftestLeafNumber(rightLeafNumber);
+ }
+
+ return parentNodes;
+ }
+
+ private static class PisaIndexHolder {
+
+ static final PisaIndex INSTANCE = new PisaIndex();
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/PisaIndexNode.java b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexNode.java
new file mode 100644
index 0000000..4c8c0ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexNode.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+
+public class PisaIndexNode {
+
+ private long leafNumber;
+ private long nodeNumber;
+ private int level;
+ private boolean isInMem;
+ private Statistics statistics;
+
+ public PisaIndexNode(long leafNumber, Statistics statistics) {
+ this.leafNumber = leafNumber;
+ this.nodeNumber = getNodeNumberByLeafNumber(leafNumber);
+ this.level = 1;
+ this.statistics = statistics;
+ this.isInMem = true;
+ }
+
+ public PisaIndexNode(long nodeNumber, int level, Statistics statistics) {
+ this.nodeNumber = nodeNumber;
+ this.level = level;
+ this.statistics = statistics;
+ this.isInMem = true;
+ }
+
+ public long getLeafNumber() throws IndexException {
+ if (level != 0) {
+ throw new IndexException("Internal node doesn't have leaf number");
+ }
+ return leafNumber;
+ }
+
+ public long getNodeNumber() {
+ return nodeNumber;
+ }
+
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ public boolean isInMem() {
+ return isInMem;
+ }
+
+ public void setInMem(boolean inMem) {
+ isInMem = inMem;
+ }
+
+ public static long getNodeNumberByLeafNumber(long leafNumber) {
+ if (leafNumber % 2 == 1) {
+ return getRootNodeNumber(leafNumber);
+ } else {
+ return getRootNodeNumber(leafNumber - 1) + 1;
+ }
+ }
+
+ public static long getRootNodeNumber(long leafNumber) {
+ long count = 0;
+ long temp = leafNumber;
+ while (temp != 0) {
+ temp &= (temp - 1);
+ count++;
+ }
+ return (leafNumber << 1) - count;
+ }
+
+ public static long getLeafNumberByNodeNumber(long nodeNumber) {
+ long i = nodeNumber / 2;
+ while (i <= nodeNumber) {
+ if (getNodeNumberByLeafNumber(i) == nodeNumber) {
+ return i;
+ }
+ i++;
+ }
+ return -1L;
+ }
+
+ public static long getLeftestLeafNumber(long rightestLeafNumber) {
+ long root = getRootNodeNumber(rightestLeafNumber);
+ long depth = root - getNodeNumberByLeafNumber(rightestLeafNumber);
+ long nodesOfTree = 2 << depth;
+ return getLeafNumberByNodeNumber(root - nodesOfTree + 2L);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/PisaIndexTree.java b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexTree.java
new file mode 100644
index 0000000..b7f9015
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/PisaIndexTree.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.index;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class PisaIndexTree {
+
+ private Map<Long, Statistics> nodeNumberStatisticMap;
+
+ public PisaIndexTree(Map<Long, Statistics> nodeNumberStatisticMap) {
+ this.nodeNumberStatisticMap = nodeNumberStatisticMap;
+ }
+
+ public Map<Long, Statistics> getNodeNumberStatisticMap() {
+ return nodeNumberStatisticMap;
+ }
+
+ public int serialize(OutputStream outputStream) throws IOException, IndexException {
+ int byteLen = ReadWriteIOUtils.write(nodeNumberStatisticMap.size(), outputStream);
+ for (Map.Entry<Long, Statistics> entry : nodeNumberStatisticMap.entrySet()) {
+ long nodeNumber = entry.getKey();
+ byteLen += ReadWriteIOUtils.write(nodeNumber, outputStream);
+ byteLen += entry.getValue().serialize(outputStream);
+ }
+ return byteLen;
+ }
+
+ public static PisaIndexTree deserialize(InputStream inputStream, TSDataType dataType)
+ throws IOException {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ Map<Long, Statistics> nodeNumberStatisticMap = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ nodeNumberStatisticMap.put(ReadWriteIOUtils.readLong(inputStream),
+ Statistics.deserialize(inputStream, dataType));
+ }
+ return new PisaIndexTree(nodeNumberStatisticMap);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/QueryData.java b/server/src/main/java/org/apache/iotdb/db/index/QueryData.java
new file mode 100644
index 0000000..d06a336
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/QueryData.java
@@ -0,0 +1,31 @@
+package org.apache.iotdb.db.index;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+// query data in cassandra or mysql using pisa
+public class QueryData {
+
+ public static void main(String[] args) {
+ args = new String[]{"key", "0", "16384000000"};
+ if (args.length != 3) {
+ System.out.println("key starttime, endtime");
+ return;
+ }
+ String key = args[0];
+ long start = Long.valueOf(args[1]);
+ long end = Long.valueOf(args[2]);
+ PisaIndex<FloatDigest> pisaIndex = new PisaIndex<>();
+ System.out.println(String.format("will search %d nodes...",
+ pisaIndex.queryPlan(new Pair<Long, Long>(start, end))));
+
+ long time = System.currentTimeMillis();
+// FloatDigest digest= pisaIndex.queryV2(new Pair<Long,Long>(start,end));
+ FloatDigest digest = pisaIndex.query(new Pair<Long, Long>(start, end));
+ time = System.currentTimeMillis() - time;
+
+ System.out.println("digest time cost:" + time);
+ System.out.println("count value:" + digest.getCount());
+ System.exit(1);
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/Config.java b/server/src/main/java/org/apache/iotdb/db/index/storage/Config.java
new file mode 100644
index 0000000..298b8c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/Config.java
@@ -0,0 +1,54 @@
+package org.apache.iotdb.db.index.storage;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+/**
+ * A class that contains configuration properties for the cassandra node it runs within.
+ *
+ * Properties declared as volatile can be mutated via JMX.
+ */
+public class Config {
+
+ public static String storage_engine = "local";
+ public static String digest_cf = "digest";
+ public static String data_cf = "data";
+ public static long timeWindow = 100L;
+
+ //time series name
+ public static String timeSeriesName = "key";
+
+ //total number of points: Math.pow(2,total)
+ public static String totals = "10";
+
+ public static boolean writePackages = true;
+
+
+ public static void load(String path) {
+ if (path == null) {
+ return;
+ }
+
+ Properties properties = new Properties();
+
+ try {
+ FileInputStream fileInputStream = new FileInputStream(path);
+ properties.load(fileInputStream);
+ properties.putAll(System.getenv());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ storage_engine = properties.getOrDefault("storage_engine", storage_engine).toString();
+ digest_cf = properties.getOrDefault("digest_cf", digest_cf).toString();
+ data_cf = properties.getOrDefault("data_cf", data_cf).toString();
+ timeSeriesName = properties.getOrDefault("timeSeriesName", timeSeriesName).toString();
+
+ timeWindow = Long
+ .parseLong(properties.getOrDefault("timeWindow", String.valueOf(timeWindow)).toString());
+
+ writePackages = Boolean
+ .parseBoolean(properties.getOrDefault("writePackages", writePackages).toString());
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/StorageFactory.java b/server/src/main/java/org/apache/iotdb/db/index/storage/StorageFactory.java
new file mode 100644
index 0000000..9f4327f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/StorageFactory.java
@@ -0,0 +1,43 @@
+package org.apache.iotdb.db.index.storage;
+
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.memory.FakeByteStore;
+import org.apache.iotdb.db.index.storage.memory.FakeStore;
+
+public class StorageFactory {
+
+ private static FakeStore fakeStore = new FakeStore();
+ private static FakeByteStore fakeByteStore = new FakeByteStore();
+
+ public static IBackendReader getBackaBackendReader() {
+ String storageClass = Config.storage_engine;
+ switch (storageClass) {
+ case "local":
+ return fakeStore;
+ default:
+ return fakeByteStore;
+ }
+ }
+
+ public static IBackendWriter getBackaBackendWriter() {
+ String storageClass = Config.storage_engine;
+ switch (storageClass) {
+ case "local":
+ return fakeStore;
+ default:
+ return fakeByteStore;
+ }
+ }
+
+ public static IBackendModelCreator getBackendModelCreator() {
+ String storageClass = Config.storage_engine;
+ switch (storageClass) {
+ case "local":
+ return fakeStore;
+ default:
+ return fakeByteStore;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendModelCreator.java b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendModelCreator.java
new file mode 100644
index 0000000..d5c7e1f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendModelCreator.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.db.index.storage.interfaces;
+
+import java.util.List;
+
+
+public interface IBackendModelCreator {
+
+ /**
+ * initialize the schema. create the keyspace and a group of columnfamilies.
+ */
+ public void initialize(String ks, int replicaFactor, List<String> columnfamilies)
+ throws Exception;
+
+ /**
+ * add a column Family into StorageSystem.
+ *
+ * @param ks database name
+ * @param cf column family name
+ */
+ public void addColumnFamily(String ks, String cf) throws Exception;
+
+ public void addFloatColumnFamily(String ks, String cf) throws Exception;
+
+ /**
+ * add column families in to StorageSystem. <br> this method is recommended when you want to
+ * create many cfs in a short time (When the storageSystem is Cassandra, because Cassandra has
+ * some performance problem if u create cf one by one).
+ */
+ public void addColumnFamilies(String ks, List<String> cfs) throws Exception;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendReader.java b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendReader.java
new file mode 100644
index 0000000..775c264
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendReader.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.db.index.storage.interfaces;
+
+
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+
+public interface IBackendReader {
+
+ byte[] getBytes(String key, String cf, long startTime);
+
+ /**
+ * giving some DataPackage's startTime,return these digests from Cassandra in one command
+ */
+ FloatDigest[] getDigests(String key, Long[] timeStamps);
+
+ FloatDigest getBeforeOrEqualDigest(String key, long timestamp);
+
+ FixWindowPackage getBeforeOrEqualPackage(String key, long timestamp);
+
+ FloatDigest getAfterOrEqualDigest(String key, long timestamp);
+
+ /**
+ * get latest one data digest after the timestamp if the timestamp belongs to a datapackage (not
+ * the starttime of a datapackage), the method will return the datapackage. SO user NEED to decide
+ * whether allign the timestamp
+ */
+ FloatDigest getLatestDigest(String key) throws Exception;
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendWriter.java b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendWriter.java
new file mode 100644
index 0000000..e2c729a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/interfaces/IBackendWriter.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.db.index.storage.interfaces;
+
+
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+
+public interface IBackendWriter {
+
+ /**
+ * @param startTimestamp an align time.
+ */
+ void write(String key, String cf, long startTimestamp, FixWindowPackage dp)
+ throws Exception;
+
+ /**
+ * @param startTimestamp an align time.
+ */
+ void write(String key, String cf, long startTimestamp, FloatDigest digest)
+ throws Exception;
+
+ void write(String key, String cf, long startTimestamp, byte[] digest)
+ throws Exception;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeByteStore.java b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeByteStore.java
new file mode 100644
index 0000000..458bfb8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeByteStore.java
@@ -0,0 +1,140 @@
+package org.apache.iotdb.db.index.storage.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.db.index.storage.model.serializer.FixWindowPackageSerializer;
+import org.apache.iotdb.db.index.storage.model.serializer.FloatDigestSerializer;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+/**
+ * this fake store has only one cf and one
+ */
+
+public class FakeByteStore implements IBackendModelCreator, IBackendWriter, IBackendReader {
+
+ private Map<String, TreeMap<Long, byte[]>> data = new HashMap<>();
+ private Map<String, TreeMap<Long, byte[]>> digests = new HashMap<>();
+
+ private FixWindowPackageSerializer dataSerializer = FixWindowPackageSerializer.getInstance();
+ private FloatDigestSerializer digestSerializer = FloatDigestSerializer.getInstance();
+
+ @Override
+ public void initialize(String ks, int replicaFactor, List<String> columnFamilies)
+ throws Exception {
+ }
+
+ @Override
+ public void addColumnFamily(String ks, String cf) throws Exception {
+ }
+
+ @Override
+ public void addColumnFamilies(String ks, List<String> cfs) throws Exception {
+ }
+
+ @Override
+ public FloatDigest[] getDigests(String key, Long[] timeStamps) {
+ List<FloatDigest> list = new ArrayList<>();
+ Map<Long, byte[]> map = digests.get(key);
+ if (map == null) {
+ return null;
+ }
+ byte[] pkg;
+ for (long time : timeStamps) {
+ if ((pkg = map.get(time)) != null) {
+ list.add(digestSerializer.deserialize(key, time, pkg));
+ }
+ }
+ return list.toArray(new FloatDigest[]{});
+ }
+
+ @Override
+ public FloatDigest getBeforeOrEqualDigest(String key, long timestamp) {
+ TreeMap<Long, byte[]> map = digests.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ Map.Entry<Long, byte[]> entry = map.floorEntry(timestamp);
+ if (entry == null) {
+ return null;
+ }
+ return digestSerializer.deserialize(key, entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public FixWindowPackage getBeforeOrEqualPackage(String key, long timestamp) {
+ TreeMap<Long, byte[]> map = data.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ Map.Entry<Long, byte[]> entry = map.floorEntry(timestamp);
+ if (entry == null) {
+ return null;
+ }
+ return dataSerializer.deserialize(key, entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public FloatDigest getAfterOrEqualDigest(String key, long timestamp) {
+ TreeMap<Long, byte[]> map = digests.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ Map.Entry<Long, byte[]> entry = map.ceilingEntry(timestamp);
+ if (entry == null) {
+ return null;
+ }
+ return digestSerializer.deserialize(key, entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public FloatDigest getLatestDigest(String key) throws Exception {
+ List<Pair<Long, FloatDigest>> list = new ArrayList<>();
+ TreeMap<Long, byte[]> map = digests.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ return digestSerializer.deserialize(key, map.lastKey(), map.lastEntry().getValue());
+ }
+
+ @Override
+ public void write(String key, String cf, long startTimestamp, FixWindowPackage dp)
+ throws Exception {
+ TreeMap<Long, byte[]> map = data.computeIfAbsent(key, k -> new TreeMap<>());
+ map.put(startTimestamp, dataSerializer.serialize(dp));
+ }
+
+ @Override
+ public void write(String key, String cf, long startTimestamp, FloatDigest digest)
+ throws Exception {
+ TreeMap<Long, byte[]> map = this.digests.computeIfAbsent(key, k -> new TreeMap<>());
+ map.put(startTimestamp, digestSerializer.serialize(digest));
+ }
+
+ @Override
+ public void write(String key, String cf, long startTimestamp, byte[] digest) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addFloatColumnFamily(String ks, String cf) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public byte[] getBytes(String key, String cf, long startTime) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeStore.java b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeStore.java
new file mode 100644
index 0000000..0c23dc2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/memory/FakeStore.java
@@ -0,0 +1,131 @@
+package org.apache.iotdb.db.index.storage.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendModelCreator;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendReader;
+import org.apache.iotdb.db.index.storage.interfaces.IBackendWriter;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+/**
+ * this fake store has only one cf and one
+ */
+public class FakeStore implements IBackendModelCreator, IBackendWriter, IBackendReader {
+
+ private Map<String, TreeMap<Long, FixWindowPackage>> data = new HashMap<String, TreeMap<Long, FixWindowPackage>>();
+ private Map<String, TreeMap<Long, FloatDigest>> digests = new HashMap<String, TreeMap<Long, FloatDigest>>();
+
+ @Override
+ public void initialize(String ks, int replicaFactor, List<String> columnfamilies)
+ throws Exception {
+ }
+
+ @Override
+ public void addColumnFamily(String ks, String cf) throws Exception {
+ }
+
+ @Override
+ public void addColumnFamilies(String ks, List<String> cfs) throws Exception {
+ }
+
+ @Override
+ public FloatDigest[] getDigests(String key, Long[] timeStamps) {
+ List<FloatDigest> list = new ArrayList<>();
+ Map<Long, FloatDigest> map = digests.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ FloatDigest pkg = null;
+ for (long time : timeStamps) {
+ if ((pkg = map.get(time)) != null) {
+ list.add(pkg);
+ }
+ }
+ return list.toArray(new FloatDigest[]{});
+ }
+
+ @Override
+ public FloatDigest getBeforeOrEqualDigest(String key, long timestamp) {
+ TreeMap<Long, FloatDigest> map = digests.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ if (map.floorEntry(timestamp) == null) {
+ return null;
+ }
+ return map.floorEntry(timestamp).getValue();
+ }
+
+ @Override
+ public FixWindowPackage getBeforeOrEqualPackage(String key, long timestamp) {
+ TreeMap<Long, FixWindowPackage> map = data.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ if (map.floorEntry(timestamp) == null) {
+ return null;
+ }
+ return map.floorEntry(timestamp).getValue();
+ }
+
+ @Override
+ public FloatDigest getAfterOrEqualDigest(String key, long timestamp) {
+ TreeMap<Long, FloatDigest> map = digests.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ if (map.ceilingKey(timestamp) == null) {
+ return null;
+ }
+ return map.ceilingEntry(timestamp).getValue();
+ }
+
+ @Override
+ public FloatDigest getLatestDigest(String key) throws Exception {
+ List<Pair<Long, FloatDigest>> list = new ArrayList<>();
+ TreeMap<Long, FloatDigest> map = digests.get(key);
+ if (map == null || map.size() == 0) {
+ return null;
+ }
+ return map.lastEntry().getValue();
+ }
+
+ @Override
+ public void write(String key, String cf, long startTimestamp, FixWindowPackage dp)
+ throws Exception {
+ TreeMap<Long, FixWindowPackage> map = data.computeIfAbsent(key, k -> new TreeMap<>());
+ map.put(startTimestamp, dp);
+ }
+
+ @Override
+ public void write(String key, String cf, long startTimestamp, FloatDigest digest)
+ throws Exception {
+ TreeMap<Long, FloatDigest> map = this.digests.computeIfAbsent(key, k -> new TreeMap<>());
+ map.put(startTimestamp, digest);
+ }
+
+ @Override
+ public void write(String key, String cf, long startTimestamp, byte[] digest) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void addFloatColumnFamily(String ks, String cf) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public byte[] getBytes(String key, String cf, long startTime) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/FixWindowPackage.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/FixWindowPackage.java
new file mode 100644
index 0000000..fc679c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/FixWindowPackage.java
@@ -0,0 +1,112 @@
+package org.apache.iotdb.db.index.storage.model;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.utils.DataDigestUtil;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class FixWindowPackage {
+
+ protected String key = null;
+ private Pair<Long, Long> timeWindow;
+ private TreeMap<Long, Object> treeMap;
+
+ public FixWindowPackage(String key, Pair<Long, Long> timeWindow) {
+ this.key = key;
+ this.timeWindow = timeWindow;
+ treeMap = new TreeMap<>();
+ }
+
+ public void add(Pair<Long, Float> data) {
+ treeMap.put(data.left, data.right);
+ }
+
+ public void add(long time, float value) {
+ treeMap.put(time, value);
+ }
+
+ public boolean isEmpty() {
+ return treeMap.isEmpty();
+ }
+
+ public boolean cover(Pair<Long, Float> data) {
+ return data.left >= timeWindow.left && data.left < timeWindow.right;
+ }
+
+ public boolean cover(long time) {
+ return time >= timeWindow.left && time < timeWindow.right;
+ }
+
+ public long getStartTime() {
+ return timeWindow.left;
+ }
+
+ public long getEndTime() {
+ return timeWindow.right;
+ }
+
+ public int size() {
+ return treeMap.size();
+ }
+
+ public long getTimeWindow() {
+ return timeWindow.right - timeWindow.left;
+ }
+
+ public TreeMap<Long, Object> getData() {
+ return treeMap;
+ }
+
+ public FloatDigest getDigest(long startTime, long endTime) {
+ Pair<Long, Long> regularRange = FixWindowPackage
+ .rangeRegular(startTime, endTime, timeWindow.left, timeWindow.right);
+ long actStartTime = regularRange.left;
+ long actEndTime = regularRange.right;
+
+ SortedMap<Long, Object> dataPoints = treeMap.subMap(actStartTime, actEndTime + 1);
+ if (dataPoints.size() < 1) {
+ return new FloatDigest(true);
+ }
+
+ return DataDigestUtil.getDigest(key, timeWindow.left, getTimeWindow(), dataPoints);
+ }
+
+ public static Pair<Long, Long> rangeRegular(long queryStartTime, long queryEndTime,
+ long startTime, long endTime) {
+ long actStartTime = queryStartTime;
+ long actEndTime = queryEndTime;
+
+ if (queryStartTime > endTime) {
+ actStartTime = endTime;
+ actEndTime = endTime;
+ } else if (queryEndTime < startTime) {
+ actStartTime = startTime;
+ actEndTime = startTime;
+ } else {
+ if (queryStartTime == -1 || queryStartTime < startTime) {
+ actStartTime = startTime;
+ }
+ if (queryEndTime == -1 || queryEndTime > endTime) {
+ actEndTime = endTime;
+ }
+
+ if (actEndTime < actStartTime) {
+ actEndTime = actStartTime;
+ }
+ }
+
+ return new Pair<>(actStartTime, actEndTime);
+ }
+
+
+ public FloatDigest getDigest() {
+ return getDigest(timeWindow.left, timeWindow.right);
+ }
+
+ @Override
+ public String toString() {
+ return treeMap.toString();
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializeFactory.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializeFactory.java
new file mode 100644
index 0000000..67d4f7e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializeFactory.java
@@ -0,0 +1,35 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import org.apache.iotdb.db.index.FloatDigest;
+
+public class DigestSerializeFactory {
+
+ private final String FLOAT = "FloatDigest";
+
+ private final String BTREENODE = "BtreeNode";
+
+ private static DigestSerializeFactory instance = new DigestSerializeFactory();
+
+ private FloatDigestSerializer floatSerializer = FloatDigestSerializer.getInstance();
+
+ private DigestSerializeFactory() {
+ }
+
+ public static DigestSerializeFactory getInstance() {
+ return instance;
+ }
+
+ public byte[] serialize(FloatDigest dataDigest) {
+ String className = dataDigest.getClass().getSimpleName();
+ switch (className) {
+ case FLOAT:
+ return floatSerializer.serialize((FloatDigest) dataDigest);
+ default:
+ return null;
+ }
+ }
+
+ public FloatDigest deserializeFloatDigest(String key, long timestamp, byte[] bytes) {
+ return floatSerializer.deserialize(key, timestamp, bytes);
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializer.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializer.java
new file mode 100644
index 0000000..1cb8f8a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/DigestSerializer.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import java.util.ArrayList;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.utils.MyBytes;
+
+
+public class DigestSerializer {
+
+ public byte[] serialize(FloatDigest FloatDigest) {
+ ArrayList<byte[]> byteList = new ArrayList<>();
+
+ byte[] aBytes = MyBytes.longToBytes(FloatDigest.getTimeWindow());
+ byteList.add(aBytes);
+
+ aBytes = MyBytes.longToBytes(FloatDigest.getCode());
+ byteList.add(aBytes);
+
+ aBytes = MyBytes.longToBytes(FloatDigest.getSerial());
+ byteList.add(aBytes);
+
+ return MyBytes.concatByteArrayList(byteList);
+ }
+
+ public FloatDigest deserialize(String key, long startTime, byte[] bytes) {
+
+ int position = 1;
+ byte[] aBytes = MyBytes.subBytes(bytes, position, 8);
+ long timeWindow = MyBytes.bytesToLong(aBytes);
+
+ position += 8;
+ aBytes = MyBytes.subBytes(bytes, position, 8);
+ long code = MyBytes.bytesToLong(aBytes);
+
+ position += 8;
+ aBytes = MyBytes.subBytes(bytes, position, 8);
+ long serial = MyBytes.bytesToLong(aBytes);
+
+ return new FloatDigest(key, startTime, timeWindow, code, serial);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FixWindowPackageSerializer.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FixWindowPackageSerializer.java
new file mode 100644
index 0000000..e66a121
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FixWindowPackageSerializer.java
@@ -0,0 +1,77 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.iotdb.db.index.storage.model.FixWindowPackage;
+import org.apache.iotdb.db.index.utils.MyBytes;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class FixWindowPackageSerializer {
+
+ protected static FixWindowPackageSerializer instance = new FixWindowPackageSerializer();
+
+ private FixWindowPackageSerializer() {
+ }
+
+ public static FixWindowPackageSerializer getInstance() {
+ return instance;
+ }
+
+ public byte[] serialize(FixWindowPackage dataPackage) {
+
+ ArrayList<byte[]> byteList = new ArrayList<>();
+ byte[] aBytes;
+
+ //8byte: time window
+ aBytes = MyBytes.longToBytes(dataPackage.getTimeWindow());
+ byteList.add(aBytes);
+ //4byte: size
+ aBytes = MyBytes.intToBytes(dataPackage.size());
+ byteList.add(aBytes);
+
+ //size * (8byte long + 4byte float): data
+ TreeMap<Long, Object> data = dataPackage.getData();
+ for (Map.Entry<Long, Object> entry : data.entrySet()) {
+ aBytes = MyBytes.longToBytes(entry.getKey());
+ byteList.add(aBytes);
+ aBytes = MyBytes.floatToBytes((float) entry.getValue());
+ byteList.add(aBytes);
+ }
+
+ return MyBytes.concatByteArrayList(byteList);
+ }
+
+ public FixWindowPackage deserialize(String key, long startTime, byte[] bytes) {
+
+ int position = 0;
+
+ //8byte: time window
+ byte[] aBytes = MyBytes.subBytes(bytes, position, 8);
+ long timeWindow = MyBytes.bytesToLong(aBytes);
+ position += 8;
+
+ //4byte: size,
+ aBytes = MyBytes.subBytes(bytes, position, 4);
+ int size = MyBytes.bytesToInt(aBytes);
+ position += 4;
+
+ //size * (8byte long + 4byte float): data
+ FixWindowPackage pkg = new FixWindowPackage(key,
+ new Pair<Long, Long>(startTime, startTime + timeWindow));
+ long time;
+ float value;
+ for (int i = 0; i < size; ++i) {
+ aBytes = MyBytes.subBytes(bytes, position, 8);
+ time = MyBytes.bytesToLong(aBytes);
+ position += 8;
+ aBytes = MyBytes.subBytes(bytes, position, 4);
+ value = MyBytes.bytesToFloat(aBytes);
+ position += 4;
+ pkg.add(time, value);
+ }
+
+ return pkg;
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FloatDigestSerializer.java b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FloatDigestSerializer.java
new file mode 100644
index 0000000..61a395c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/storage/model/serializer/FloatDigestSerializer.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.db.index.storage.model.serializer;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import org.apache.iotdb.db.index.FloatDigest;
+import org.apache.iotdb.db.index.utils.MyBytes;
+
+
+public class FloatDigestSerializer extends DigestSerializer
+{
+ protected static FloatDigestSerializer instance = new FloatDigestSerializer();
+
+ private FloatDigestSerializer() {
+
+ }
+
+ public static FloatDigestSerializer getInstance() {
+ return instance;
+ }
+
+ public byte[] serialize(FloatDigest floatDigest) {
+ ArrayList<byte[]> byteList = new ArrayList<>();
+
+ byte[] aBytes = new byte[1];
+
+ aBytes[0] = 2;
+ byteList.add(aBytes);
+
+ aBytes = super.serialize(floatDigest);
+ byteList.add(aBytes);
+
+ aBytes = MyBytes.floatToBytes(floatDigest.getMax());
+ byteList.add(aBytes);
+
+ aBytes = MyBytes.floatToBytes(floatDigest.getMin());
+ byteList.add(aBytes);
+
+ aBytes = MyBytes.longToBytes(floatDigest.getCount());
+ byteList.add(aBytes);
+
+ aBytes = MyBytes.floatToBytes(floatDigest.getAvg());
+ byteList.add(aBytes);
+
+ aBytes = MyBytes.StringToBytes(floatDigest.getSquareSum().toString());
+ byteList.add(aBytes);
+
+ return MyBytes.concatByteArrayList(byteList);
+ }
+
+ public FloatDigest deserialize(String key, long startTime, byte[] bytes) {
+
+ FloatDigest dataDigest = super.deserialize(key, startTime, bytes);
+
+ int position = 1 + 8 + 8 + 8;
+ byte[] aBytes = MyBytes.subBytes(bytes, position, 4);
+ float max = MyBytes.bytesToFloat(aBytes);
+
+ position += 4;
+ aBytes = MyBytes.subBytes(bytes, position, 4);
+ float min = MyBytes.bytesToFloat(aBytes);
+
+ position += 4;
+ aBytes = MyBytes.subBytes(bytes, position, 8);
+ long count = MyBytes.bytesToLong(aBytes);
+
+ position += 8;
+ aBytes = MyBytes.subBytes(bytes, position, 4);
+ float avg = MyBytes.bytesToFloat(aBytes);
+
+ position += 4;
+ aBytes = MyBytes.subBytes(bytes, position, bytes.length-position);
+ BigDecimal squareSum = new BigDecimal(MyBytes.bytesToString(aBytes));
+
+ dataDigest.setMax(max);
+ dataDigest.setMin(min);
+ dataDigest.setCount(count);
+ dataDigest.setAvg(avg);
+ dataDigest.setSquareSum(squareSum);
+ return dataDigest;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/DataDigestUtil.java b/server/src/main/java/org/apache/iotdb/db/index/utils/DataDigestUtil.java
new file mode 100644
index 0000000..209d1fb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/DataDigestUtil.java
@@ -0,0 +1,125 @@
+package org.apache.iotdb.db.index.utils;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.Vector;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.iotdb.db.index.FloatDigest;
+
+public class DataDigestUtil {
+
+ public static FloatDigest aggregate(String key, long startTime, FloatDigest[] dataDigests) {
+ if (dataDigests.length == 0) {
+ return null;
+ }
+ return floatAggregate(key, startTime, dataDigests);
+ }
+
+ public static FloatDigest getDigest(String key, long startTime, long timeWindow,
+ Collection<Object> datas) {
+ float[] floatDatas = ArrayUtils.toPrimitive(datas.toArray(new Float[]{}));
+ return floatDigest(key, startTime, timeWindow, floatDatas);
+ }
+
+ public static FloatDigest getDigest(String key, long startTime, long timeWindow,
+ SortedMap<Long, Object> dataPoints) {
+ return DataDigestUtil.partFloatDigest(key, startTime, timeWindow,
+ dataPoints);
+ }
+
+
+ public static FloatDigest floatDigest(String key, long startTime, long timeWindow,
+ float[] data) {
+ float max = Float.MIN_VALUE;
+ float min = Float.MAX_VALUE;
+ long count = 0;
+ double sum = 0;
+ Vector<Float> container = new Vector<>();
+ for (float aData : data) {
+ if (aData != Float.MAX_VALUE) {
+ if (max < aData) {
+ max = aData;
+ }
+ if (min > aData) {
+ min = aData;
+ }
+ ++count;
+ sum += aData;
+ container.add(aData);
+ }
+ }
+ float avg = (float) (sum / count);
+
+ BigDecimal squareSum = new BigDecimal(0);
+ for (Float float1 : container) {
+ Double squaredif = Math.pow(float1 - avg, 2.0);
+ squareSum = squareSum.add(new BigDecimal(squaredif));
+ }
+ return new FloatDigest(key, startTime, timeWindow, max, min, count, avg,
+ squareSum);
+ }
+
+ public static FloatDigest partFloatDigest(String key, long startTime, long timeWindow,
+ SortedMap<Long, Object> dataPoints) {
+ float max = Float.MIN_VALUE;
+ float min = Float.MAX_VALUE;
+ long count = 0;
+ double sum = 0;
+ BigDecimal squareSum = new BigDecimal(0);
+ Vector<Float> container = new Vector<>();
+ for (Entry<Long, Object> dataPoint : dataPoints.entrySet()) {
+ float value = (float) dataPoint.getValue();
+ if (value != Float.MAX_VALUE) {
+ if (max < value) {
+ max = value;
+ }
+ if (min > value) {
+ min = value;
+ }
+ count++;
+ sum += value;
+ container.add(value);
+ }
+ }
+ float avg = (float) (sum / count);
+
+ for (Float aFloat : container) {
+ double squareDif = Math.pow(aFloat - avg, 2.0);
+ squareSum = squareSum.add(new BigDecimal(squareDif));
+ }
+ return new FloatDigest(key, startTime, timeWindow, max, min, count, avg,
+ squareSum);
+ }
+
+ public static FloatDigest floatAggregate(String key, long startTime,
+ FloatDigest[] dataDigests) {
+ long timeWindow = 0L;
+
+ float max = Float.MIN_VALUE;
+ float min = Float.MAX_VALUE;
+ long count = 0;
+ double sum = 0;
+ BigDecimal squareSum = new BigDecimal(0);
+ for (FloatDigest dataDigest : dataDigests) {
+ FloatDigest floatDigest = (FloatDigest) dataDigest;
+ timeWindow = timeWindow + dataDigest.getTimeWindow();
+
+ if (max < floatDigest.getMax()) {
+ max = floatDigest.getMax();
+ }
+ if (min > floatDigest.getMin()) {
+ min = floatDigest.getMin();
+ }
+ sum += (floatDigest.getAvg() * floatDigest.getCount());
+ count += floatDigest.getCount();
+ squareSum.add(floatDigest.getSquareSum());
+ }
+ float avg = (float) (sum / count);
+
+ return new FloatDigest(key, startTime, timeWindow, max, min, count, avg,
+ squareSum);
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/DigestUtil.java b/server/src/main/java/org/apache/iotdb/db/index/utils/DigestUtil.java
new file mode 100644
index 0000000..f29c477
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/DigestUtil.java
@@ -0,0 +1,109 @@
+package org.apache.iotdb.db.index.utils;
+
+
+import com.clearspring.analytics.util.Pair;
+import org.apache.iotdb.db.index.storage.Config;
+
+public class DigestUtil {
+
+ // if left point, serialNo is the result of getRootCodeBySerialNum().
+ // else, serialNo is the result of getRootCodeBySerialNum() of its left bother - 1.
+ public static long serialToCode(long serialNumber) {
+ if (serialNumber % 2 == 1) {
+ return getRootCodeBySerialNum(serialNumber);
+ } else {
+ return getRootCodeBySerialNum(serialNumber - 1) + 1;
+ }
+ }
+
+ /**
+ * get the right leaf serial number of the code number
+ */
+ public static long codeToSerial(long code) {
+ long begin = code / 2;
+ for (long i = begin; ; ++i) {
+ if (getRootCodeBySerialNum(i) == code) {
+ return i;
+ }
+ }
+ }
+
+ public static long timeToCode(long time) {
+ long serial = time / Config.timeWindow + 1;
+ return DigestUtil.serialToCode(serial);
+ }
+
+
+ public static long regularTime(long time) {
+ return time - time % Config.timeWindow;
+ }
+
+ public static long timeToSerial(long time) {
+ return time / Config.timeWindow + 1;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(getRootCodeBySerialNum(13));
+ }
+
+ // get root code number by serial number
+ public static long getRootCodeBySerialNum(long serialNumber) {
+ long count = 0;
+ long serialNum = serialNumber;
+ for (; serialNum != 0; ++count) {
+ serialNum &= (serialNum - 1);
+ }
+ return (serialNumber << 1) - count;
+ }
+
+ /**
+ * depth=depth of tree - 1
+ */
+ public static long getDistanceBetweenLeftestAndRoot(long depth) {
+ //2^(depth+1)
+ long nodesOfTree = 2 << depth;
+ long distance = nodesOfTree - 2L;
+ return distance;
+ }
+
+ public static long getLeftestCode(long root, long rightestCode) {
+ long depth = root - rightestCode;
+ long leftestCode = root - getDistanceBetweenLeftestAndRoot(depth);
+ return leftestCode;
+ }
+
+ private static long[] table2 = new long[129];
+
+ static {
+ for (int i = 0; i <= 128; i++) {
+ table2[i] = (long) Math.pow(2, i) - 1;
+ }
+ }
+
+ public static Pair<Long, Long> getChildrenCode(long parentCode) {
+ int i = 0;
+ long root = parentCode;
+ int skew = 0;
+ while (true) {
+ i = 0;
+ while (i <= 128) {
+ if (root == 1) {
+ return new Pair<Long, Long>(skew + 1L, skew + 1L);
+ } else if (root == 2) {
+ return new Pair<Long, Long>(skew + 2L, skew + 2L);
+ }
+ if (root == table2[i]) {
+ return new Pair<Long, Long>(root / 2 + skew, parentCode - 1);
+ }
+ if (root < table2[i]) {
+ i--;
+ break;
+ } else {
+ i++;
+ }
+ }
+ root -= table2[i];
+ skew += table2[i];
+ }
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/ForestRootStack.java b/server/src/main/java/org/apache/iotdb/db/index/utils/ForestRootStack.java
new file mode 100644
index 0000000..a87ac49
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/ForestRootStack.java
@@ -0,0 +1,47 @@
+package org.apache.iotdb.db.index.utils;
+
+import java.util.Stack;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class ForestRootStack<T> {
+
+ private Stack<T> rootNodes = null;
+
+ public ForestRootStack() {
+ this.rootNodes = new Stack<T>();
+ }
+
+ public void push(T aRootNode) {
+ if (rootNodes == null) {
+ rootNodes = new Stack<T>();
+ }
+
+ rootNodes.push(aRootNode);
+ }
+
+ public Pair<T, T> popPair() {
+ if (rootNodes == null) {
+ return null;
+ }
+
+ T rightChild = rootNodes.pop();
+ T leftChild = rootNodes.pop();
+ return new Pair<>(leftChild, rightChild);
+ }
+
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (T t : rootNodes) {
+ stringBuilder.append(t).append(",");
+ }
+ return stringBuilder.toString();
+ }
+
+ public int size() {
+ return this.rootNodes.size();
+ }
+
+ public T get(int index) {
+ return this.rootNodes.get(index);
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/index/utils/MyBytes.java b/server/src/main/java/org/apache/iotdb/db/index/utils/MyBytes.java
new file mode 100644
index 0000000..b2233bf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/index/utils/MyBytes.java
@@ -0,0 +1,255 @@
+package org.apache.iotdb.db.index.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+
+//TODO Kangrong comment by English
+public class MyBytes {
+
+ /**
+ * byte[] convert to int
+ */
+ public static byte[] intToBytes(int i) {
+ byte[] result = new byte[4];
+ result[0] = (byte) ((i >> 24) & 0xFF);
+ result[1] = (byte) ((i >> 16) & 0xFF);
+ result[2] = (byte) ((i >> 8) & 0xFF);
+ result[3] = (byte) (i & 0xFF);
+ return result;
+ }
+
+ /**
+ * byte[] convert to int
+ */
+ public static int bytesToInt(byte[] bytes) {
+ int value = 0;
+ // high bit to low
+ for (int i = 0; i < 4; i++) {
+ int shift = (4 - 1 - i) * 8;
+ value += (bytes[i] & 0x000000FF) << shift;
+ }
+ return value;
+ }
+
+ /**
+ * float杞崲byte
+ */
+ public static byte[] floatToBytes(float x) {
+ byte[] b = new byte[4];
+ int l = Float.floatToIntBits(x);
+ for (int i = 0; i < 4; i++) {
+ b[i] = new Integer(l).byteValue();
+ l = l >> 8;
+ }
+ return b;
+ }
+
+ public static int floatSize() {
+ return 4;
+ }
+
+ /**
+ * fill byte array b by x from offset position.
+ *
+ * @param b result
+ */
+ public static void floatToBytes(float x, byte[] b, int offset) {
+ assert b.length - offset >= 4;
+ int l = Float.floatToIntBits(x);
+ for (int i = offset; i < 4 + offset; i++) {
+ b[i] = new Integer(l).byteValue();
+ l = l >> 8;
+ }
+ }
+
+
+ /**
+ * 閫氳繃byte鏁扮粍鍙栧緱float
+ */
+ public static float bytesToFloat(byte[] b) {
+ int l;
+ l = b[0];
+ l &= 0xff;
+ l |= ((long) b[1] << 8);
+ l &= 0xffff;
+ l |= ((long) b[2] << 16);
+ l &= 0xffffff;
+ l |= ((long) b[3] << 24);
+ return Float.intBitsToFloat(l);
+ }
+
+ /**
+ * double杞崲byte
+ *
+ * @return byte[]
+ */
+ public static byte[] doubleToBytes(double data) {
+ byte[] bytes = new byte[8];
+ long value = Double.doubleToLongBits(data);
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = new Long(value).byteValue();
+ value = value >> 8;
+ }
+ return bytes;
+ }
+
+ public static int doubleSize() {
+ return 8;
+ }
+
+ /**
+ * 閫氳繃byte鏁扮粍鍙栧緱double
+ */
+ public static double bytesToDouble(byte[] bytes) {
+ long value = bytes[0];
+ value &= 0xff;
+ value |= ((long) bytes[1] << 8);
+ value &= 0xffff;
+ value |= ((long) bytes[2] << 16);
+ value &= 0xffffff;
+ value |= ((long) bytes[3] << 24);
+ value &= 0xffffffffL;
+ value |= ((long) bytes[4] << 32);
+ value &= 0xffffffffffL;
+ value |= ((long) bytes[5] << 40);
+ value &= 0xffffffffffffL;
+ value |= ((long) bytes[6] << 48);
+ value &= 0xffffffffffffffL;
+ value |= ((long) bytes[7] << 56);
+
+ return Double.longBitsToDouble(value);
+ }
+
+ /**
+ * float杞崲byte
+ */
+ public static byte[] boolToBytes(boolean x) {
+ byte[] b = new byte[1];
+ if (x) {
+ b[0] = 1;
+ } else {
+ b[0] = 0;
+ }
+ return b;
+ }
+
+ public static int boolSize() {
+ return 1;
+ }
+
+ /**
+ * float杞崲byte
+ */
+ public static byte[] boolToBytes(boolean x, byte[] b, int offset) {
+ if (x) {
+ b[offset] = 1;
+ } else {
+ b[offset] = 0;
+ }
+ return b;
+ }
+
+ /**
+ * 閫氳繃byte鏁扮粍鍙栧緱float
+ */
+ public static boolean bytesToBool(byte[] b) {
+ if (b.length != 1) {
+ return false;
+ }
+ if (b[0] == 0) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public static byte[] longToBytes(long num) {
+ byte[] byteNum = new byte[8];
+ for (int ix = 0; ix < 8; ++ix) {
+ int offset = 64 - (ix + 1) * 8;
+ byteNum[ix] = (byte) ((num >> offset) & 0xff);
+ }
+ return byteNum;
+ }
+
+ public static int longSize() {
+ return 8;
+ }
+
+ public static byte[] longToBytes(long num, byte[] byteNum, int offset_) {
+ for (int ix = 0; ix < 8; ++ix) {
+ int offset = 64 - (ix + 1) * 8;
+ byteNum[ix + offset_] = (byte) ((num >> offset) & 0xff);
+ }
+ return byteNum;
+ }
+
+ public static long bytesToLong(byte[] byteNum) {
+ long num = 0;
+ for (int ix = 0; ix < 8; ++ix) {
+ num <<= 8;
+ num |= (byteNum[ix] & 0xff);
+ }
+ return num;
+ }
+
+ public static byte[] StringToBytes(String str) {
+ try {
+ return str.getBytes("UTF8");
+ } catch (UnsupportedEncodingException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public static String bytesToString(byte[] byteStr) {
+ try {
+ return new String(byteStr, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ // concat two byteArray
+ public static byte[] concatByteArray(byte[] a, byte[] b) {
+ byte[] c = new byte[a.length + b.length];
+ System.arraycopy(a, 0, c, 0, a.length);
+ System.arraycopy(b, 0, c, a.length, b.length);
+ return c;
+ }
+
+ // concat two byteArray
+ public static byte[] concatByteArrayList(List<byte[]> list) {
+ int size = list.size();
+ int len = 0;
+ for (byte[] cs : list) {
+ len += cs.length;
+ }
+ byte[] result = new byte[len];
+ int pos = 0;
+ for (int i = 0; i < size; i++) {
+ int l = list.get(i).length;
+ System.arraycopy(list.get(i), 0, result, pos, l);
+ pos += l;
+ }
+ return result;
+ }
+
+ public static byte[] subBytes(byte[] src, int start, int length) {
+ if ((start + length) > src.length) {
+ return null;
+ }
+ if (length <= 0) {
+ return null;
+ }
+ byte[] result = new byte[length];
+ for (int i = 0; i < length; i++) {
+ result[i] = src[start + i];
+ }
+ return result;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
index 6b41a44..c0ad8a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
@@ -118,6 +118,16 @@ public class MLogWriter {
newLine();
}
+ public void addIndex(String path, String indexType) throws IOException {
+ writer.write(String.format("%s,%s,%s", MetadataOperationType.ADD_INDEX, path, indexType));
+ newLine();
+ }
+
+ public void dropIndex(String path, String indexType) throws IOException {
+ writer.write(String.format("%s,%s,%s", MetadataOperationType.DROP_INDEX, path, indexType));
+ newLine();
+ }
+
public static void upgradeMLog(String schemaDir, String logFileName) throws IOException {
File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
File tmpLogFile = SystemFileFactory.INSTANCE.getFile(logFile.getAbsolutePath() + ".tmp");
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index e39d0ac..272b3d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,31 @@
*/
package org.apache.iotdb.db.metadata;
+import static java.util.stream.Collectors.toList;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,7 +51,13 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -55,20 +86,6 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.stream.Collectors.toList;
-
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -1140,7 +1157,7 @@ public class MManager {
* @return deviceId
*/
public String getDeviceId(String path) {
- MNode deviceNode = null;
+ MNode deviceNode;
try {
deviceNode = getDeviceNode(path);
path = deviceNode.getFullPath();
@@ -1667,6 +1684,48 @@ public class MManager {
}
/**
+ * Check whether the timeseries has index
+ */
+ public boolean checkIndex(String path, IndexType indexType) throws MetadataException {
+ lock.readLock().lock();
+ try {
+ return mtree.getSchema(path).isIndexed(indexType.toString());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Create index for timeseries
+ */
+ public void addIndex(String path, IndexType indexType) throws MetadataException, IOException {
+ lock.writeLock().lock();
+ try {
+ mtree.getSchema(path).setIndex(indexType.toString(), true);
+ if (!isRecovering) {
+ logWriter.addIndex(path, indexType.toString());
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Drop index for one timeseries
+ */
+ public void dropIndex(String path, IndexType indexType) throws MetadataException, IOException {
+ lock.writeLock().lock();
+ try {
+ mtree.getSchema(path).setIndex(indexType.toString(), false);
+ if (!isRecovering) {
+ logWriter.dropIndex(path, indexType.toString());
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
* Check whether the given path contains a storage group
*/
boolean checkStorageGroupByPath(String path) {
@@ -1828,15 +1887,10 @@ public class MManager {
}
/**
- * get schema for device.
- * Attention!!! Only support insertPlan and insertTabletsPlan
- * @param deviceId
- * @param measurementList
- * @param plan
- * @return
- * @throws MetadataException
+ * get schema for device. Attention!!! Only support insertPlan and insertTabletsPlan
*/
- public MeasurementSchema[] getSeriesSchemas(String deviceId, String[] measurementList, PhysicalPlan plan) throws MetadataException {
+ public MeasurementSchema[] getSeriesSchemas(String deviceId, String[] measurementList,
+ PhysicalPlan plan) throws MetadataException {
MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
MNode deviceNode = null;
@@ -1851,7 +1905,8 @@ public class MManager {
// could not create it
if (!config.isAutoCreateSchemaEnabled()) {
throw new MetadataException(String.format(
- "Current deviceId[%s] does not contain measurement:%s", deviceId, measurementList[i]));
+ "Current deviceId[%s] does not contain measurement:%s", deviceId,
+ measurementList[i]));
}
// create it
@@ -1859,19 +1914,20 @@ public class MManager {
TSDataType dataType = getTypeInLoc(plan, i);
createTimeseries(
- path.getFullPath(),
- dataType,
- getDefaultEncoding(dataType),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- Collections.emptyMap());
+ path.getFullPath(),
+ dataType,
+ getDefaultEncoding(dataType),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
}
- MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode, measurementList[i]);
+ MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode,
+ measurementList[i]);
// check type is match
TSDataType insertDataType = null;
if (plan instanceof InsertPlan) {
- if (!((InsertPlan)plan).isNeedInferType()) {
+ if (!((InsertPlan) plan).isNeedInferType()) {
// only when InsertPlan's values is object[], we should check type
insertDataType = getTypeInLoc(plan, i);
} else {
@@ -1883,11 +1939,11 @@ public class MManager {
if (measurementNode.getSchema().getType() != insertDataType) {
logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
- measurementList[i], insertDataType, measurementNode.getSchema().getType());
+ measurementList[i], insertDataType, measurementNode.getSchema().getType());
if (!config.isEnablePartialInsert()) {
throw new MetadataException(String.format(
- "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
- measurementList[i], insertDataType, measurementNode.getSchema().getType()));
+ "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
+ measurementList[i], insertDataType, measurementNode.getSchema().getType()));
} else {
// mark failed measurement
if (plan instanceof InsertTabletPlan) {
@@ -1910,7 +1966,7 @@ public class MManager {
}
} catch (MetadataException e) {
logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i],
- e.getMessage());
+ e.getMessage());
if (config.isEnablePartialInsert()) {
// mark failed measurement
if (plan instanceof InsertPlan) {
@@ -1926,13 +1982,14 @@ public class MManager {
return schemas;
}
- private void changeStringValueToRealType(InsertPlan plan, int loc, TSDataType type) throws MetadataException {
+ private void changeStringValueToRealType(InsertPlan plan, int loc, TSDataType type)
+ throws MetadataException {
plan.getTypes()[loc] = type;
try {
switch (type) {
case INT32:
plan.getValues()[loc] =
- Integer.parseInt(String.valueOf(plan.getValues()[loc]));
+ Integer.parseInt(String.valueOf(plan.getValues()[loc]));
break;
case INT64:
plan.getValues()[loc] =
@@ -1956,7 +2013,8 @@ public class MManager {
break;
}
} catch (ClassCastException e) {
- logger.error("inconsistent type between client and server for " + e.getMessage() + " " + type);
+ logger
+ .error("inconsistent type between client and server for " + e.getMessage() + " " + type);
throw new MetadataException(e.getMessage());
} catch (NumberFormatException e) {
logger.error("inconsistent type between type {} and value {}", type, plan.getValues()[loc]);
@@ -1984,49 +2042,43 @@ public class MManager {
return conf.getDefaultTextEncoding();
default:
throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", dataType.toString()));
+ String.format("Data type %s is not supported.", dataType.toString()));
}
}
/**
- * get dataType of plan, in loc measurements
- * only support InsertPlan and InsertTabletPlan
- * @param plan
- * @param loc
- * @return
- * @throws MetadataException
+ * get dataType of plan, in loc measurements only support InsertPlan and InsertTabletPlan
*/
private TSDataType getTypeInLoc(PhysicalPlan plan, int loc) throws MetadataException {
TSDataType dataType;
if (plan instanceof InsertPlan) {
InsertPlan tPlan = (InsertPlan) plan;
- dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+ dataType = TypeInferenceUtils
+ .getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
} else if (plan instanceof InsertTabletPlan) {
dataType = ((InsertTabletPlan) plan).getDataTypes()[loc];
- } else {
+ } else {
throw new MetadataException(String.format(
- "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+ "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
}
return dataType;
}
/**
- * when insert, we lock device node for not create deleted time series
- * before insert, we should call this function to lock the device node
- * @param deviceId
+ * when insert, we lock device node for not create deleted time series before insert, we should
+ * call this function to lock the device node
*/
public void lockInsert(String deviceId) throws MetadataException {
getDeviceNodeWithAutoCreateAndReadLock(deviceId);
}
/**
- * when insert, we lock device node for not create deleted time series
- * after insert, we should call this function to unlock the device node
- * @param deviceId
+ * when insert, we lock device node for not create deleted time series after insert, we should
+ * call this function to unlock the device node
*/
public void unlockInsert(String deviceId) {
try {
- MNode mNode =getDeviceNode(deviceId);
+ MNode mNode = getDeviceNode(deviceId);
mNode.readUnlock();
} catch (MetadataException e) {
// ignore the exception
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
index 3700972..98899c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
@@ -27,6 +27,8 @@ public class MetadataOperationType {
public static final String CREATE_TIMESERIES = "0";
public static final String DELETE_TIMESERIES = "1";
public static final String SET_STORAGE_GROUP = "2";
+ public static final String ADD_INDEX = "3";
+ public static final String DROP_INDEX = "4";
public static final String SET_TTL = "10";
public static final String DELETE_STORAGE_GROUP = "11";
public static final String CHANGE_OFFSET = "12";
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index adb0b04..e78644a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -18,6 +18,43 @@
*/
package org.apache.iotdb.db.qp.executor;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -37,10 +74,16 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.index.IndexException;
+import org.apache.iotdb.db.exception.index.UnSupportedIndexTypeException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.index.IndexManager;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
+import org.apache.iotdb.db.index.PisaIndex;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -49,8 +92,39 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.IndexPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -78,14 +152,6 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
public class PlanExecutor implements IPlanExecutor {
private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
@@ -201,6 +267,9 @@ public class PlanExecutor implements IPlanExecutor {
case CREATE_SCHEMA_SNAPSHOT:
operateCreateSnapshot();
return true;
+ case INDEX:
+ operateIndex((IndexPlan) plan);
+ return true;
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorType()));
@@ -834,6 +903,48 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ private void operateIndex(IndexPlan indexPlan) throws QueryProcessException {
+ try {
+ String path = indexPlan.getPaths().get(0).getFullPath();
+ IndexType indexType = indexPlan.getIndexType();
+ if (!mManager.isPathExist(path)) {
+ throw new PathNotExistException(path);
+ }
+ PisaIndex pisaIndex = IndexManager.getIndexInstance(indexType);
+ if (pisaIndex == null) {
+ throw new UnSupportedIndexTypeException(indexType.toString());
+ }
+ switch (indexPlan.getIndexOperatorType()) {
+ case CREATE_INDEX:
+ if (mManager.checkIndex(path, indexType)) {
+ throw new IndexException(
+ String.format("Timeseries %s has already been indexed.", path));
+ }
+
+ if (pisaIndex.build(new Path(path))) {
+ mManager.addIndex(path, indexType);
+ }
+ break;
+ case DROP_INDEX:
+ if (!mManager.checkIndex(path, indexType)) {
+ throw new IndexException(
+ String.format("Timeseries %s hasn't been indexed.", path));
+ }
+ if (pisaIndex.drop(new Path(path))) {
+ mManager.dropIndex(path, indexType);
+ }
+ break;
+ default:
+ throw new IndexException(String
+ .format("Not supported index operation %s", indexPlan.getIndexOperatorType()));
+ }
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ } catch (IOException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+
@Override
public void update(Path path, long startTime, long endTime, String value) {
throw new UnsupportedOperationException("update is not supported now");
@@ -856,14 +967,15 @@ public class PlanExecutor implements IPlanExecutor {
}
protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
- throws MetadataException {
- return mManager.getSeriesSchemas(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
+ throws MetadataException {
+ return mManager
+ .getSeriesSchemas(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
}
protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan insertTabletPlan)
- throws MetadataException {
+ throws MetadataException {
return mManager.getSeriesSchemas(insertTabletPlan.getDeviceId(),
- insertTabletPlan.getMeasurements(), insertTabletPlan);
+ insertTabletPlan.getMeasurements(), insertTabletPlan);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/IndexOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/IndexOperator.java
new file mode 100644
index 0000000..0a6e5ca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/IndexOperator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.logical.crud;
+
+import org.apache.iotdb.db.index.IndexManager.IndexType;
+import org.apache.iotdb.db.qp.logical.RootOperator;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public final class IndexOperator extends RootOperator {
+
+ private Path path;
+ private final IndexOperatorType indexOperatorType;
+ private final IndexType indexType;
+
+ public IndexOperator(int tokenIntType, Path path, IndexOperatorType indexOperatorType, IndexType indexType) {
+ super(tokenIntType);
+ this.path = path;
+ this.indexOperatorType = indexOperatorType;
+ this.indexType = indexType;
+ operatorType = OperatorType.INDEX;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public IndexOperatorType getIndexOperatorType() {
+ return indexOperatorType;
+ }
+
+ public IndexType getIndexType() {
+ return indexType;
+ }
+
+ public void setPath(Path path) {
+ this.path = path;
+ }
+
+ public enum IndexOperatorType {
+ CREATE_INDEX, DROP_INDEX
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/IndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/IndexPlan.java
new file mode 100644
index 0000000..0ce944d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/IndexPlan.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.crud;
+
+
+import static org.apache.iotdb.db.qp.logical.Operator.OperatorType.INDEX;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator.IndexOperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class IndexPlan extends PhysicalPlan {
+
+ private Path path;
+ private final IndexType indexType;
+ private final IndexOperatorType indexOperatorType;
+
+ public IndexPlan(Path path, IndexOperatorType indexOperatorType, IndexType indexType) {
+ super(false, INDEX);
+ this.path = path;
+ this.indexOperatorType = indexOperatorType;
+ this.indexType = indexType;
+ }
+
+ @Override
+ public List<Path> getPaths() {
+ List<Path> list = new ArrayList<>();
+ if (path != null) {
+ list.add(path);
+ }
+ return list;
+ }
+
+ public IndexType getIndexType() {
+ return indexType;
+ }
+
+ public IndexOperatorType getIndexOperatorType() {
+ return indexOperatorType;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 6b14f26..0600be0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -29,7 +29,9 @@ import java.util.Map;
import java.util.Set;
import org.antlr.v4.runtime.tree.TerminalNode;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.index.UnSupportedIndexTypeException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.index.IndexManager.IndexType;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.RootOperator;
@@ -38,6 +40,8 @@ import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator;
import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
import org.apache.iotdb.db.qp.logical.crud.FromOperator;
import org.apache.iotdb.db.qp.logical.crud.InOperator;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator.IndexOperatorType;
import org.apache.iotdb.db.qp.logical.crud.InsertOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.SelectOperator;
@@ -79,6 +83,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AttributeClausesContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ConstantContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountNodesContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountTimeseriesContext;
+import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateIndexContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateRoleContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateSnapshotContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateTimeseriesContext;
@@ -87,6 +92,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DateExpressionContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteStatementContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteStorageGroupContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteTimeseriesContext;
+import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropIndexContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropRoleContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropUserContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.FillClauseContext;
@@ -733,6 +739,34 @@ public class LogicalGenerator extends SqlBaseBaseListener {
initializedOperator = new ShowTTLOperator(storageGroups);
}
+ @Override
+ public void enterCreateIndex(CreateIndexContext ctx) {
+ super.enterCreateIndex(ctx);
+ IndexType indexType;
+ try {
+ indexType = IndexType.getIndexType(ctx.ID().getText());
+ } catch (UnSupportedIndexTypeException e) {
+ throw new SQLParserException(e);
+ }
+ initializedOperator = new IndexOperator(SQLConstant.TOK_CREATE_INDEX,
+ parseFullPath(ctx.fullPath()), IndexOperatorType.CREATE_INDEX, indexType);
+ operatorType = SQLConstant.TOK_CREATE_INDEX;
+ }
+
+ @Override
+ public void enterDropIndex(DropIndexContext ctx) {
+ super.enterDropIndex(ctx);
+ IndexType indexType;
+ try {
+ indexType = IndexType.getIndexType(ctx.ID().getText());
+ } catch (UnSupportedIndexTypeException e) {
+ throw new SQLParserException(e);
+ }
+ initializedOperator = new IndexOperator(SQLConstant.TOK_DROP_INDEX,
+ parseFullPath(ctx.fullPath()), IndexOperatorType.DROP_INDEX, indexType);
+ operatorType = SQLConstant.TOK_DROP_INDEX;
+ }
+
private String[] parsePrivilege(PrivilegesContext ctx) {
List<TerminalNode> privilegeList = ctx.STRING_LITERAL();
List<String> privileges = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index a72fcb0..2eebfd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator;
import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.qp.logical.crud.IndexOperator;
import org.apache.iotdb.db.qp.logical.crud.InsertOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator;
@@ -69,6 +70,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.IndexPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -206,6 +208,10 @@ public class PhysicalGenerator {
String.format(
"not supported operator type %s in ttl operation.", operator.getType()));
}
+ case INDEX:
+ IndexOperator indexOperator = (IndexOperator) operator;
+ return new IndexPlan(indexOperator.getPath(),
+ indexOperator.getIndexOperatorType(), indexOperator.getIndexType());
case LOAD_CONFIGURATION:
LoadConfigurationOperatorType type = ((LoadConfigurationOperator) operator)
.getLoadConfigurationOperatorType();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index fd359e1..d503aaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -57,18 +57,21 @@ public class SchemaUtils {
logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getMeasurementId(),
e);
}
-
}
public static List<TSDataType> getSeriesTypesByPath(Collection<Path> paths)
throws MetadataException {
List<TSDataType> dataTypes = new ArrayList<>();
for (Path path : paths) {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+ dataTypes.add(getSeriesTypeByPath(path));
}
return dataTypes;
}
+ public static TSDataType getSeriesTypeByPath(Path path) throws MetadataException {
+ return MManager.getInstance().getSeriesType(path.getFullPath());
+ }
+
/**
* @param paths time series paths
* @param aggregation aggregation function, may be null
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index cbd14d4..9638c45 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -30,7 +30,6 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -280,7 +279,7 @@ public class MManagerBasicTest {
}
@Test
- public void testMaximalSeriesNumberAmongStorageGroup() throws MetadataException, PathException {
+ public void testMaximalSeriesNumberAmongStorageGroup() throws MetadataException {
MManager manager = MManager.getInstance();
assertEquals(0, manager.getMaximalSeriesNumberAmongStorageGroups());
manager.setStorageGroup("root.laptop");
diff --git a/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
index 96af121..2ac6a79 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.PathException;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult;
import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
@@ -49,7 +48,8 @@ public class GroupByEngineDataSetTest {
groupByTimePlan.setStartTime(startTime);
groupByTimePlan.setEndTime(endTime);
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+ groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -62,7 +62,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test2() throws IOException, PathException, StorageEngineException {
+ public void test2() throws IOException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 5;
@@ -77,7 +77,8 @@ public class GroupByEngineDataSetTest {
groupByTimePlan.setSlidingStep(slidingStep);
groupByTimePlan.setStartTime(startTime);
groupByTimePlan.setEndTime(endTime);
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+ groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -90,7 +91,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test3() throws IOException, PathException, StorageEngineException {
+ public void test3() throws IOException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 3;
@@ -105,7 +106,8 @@ public class GroupByEngineDataSetTest {
groupByTimePlan.setSlidingStep(slidingStep);
groupByTimePlan.setStartTime(startTime);
groupByTimePlan.setEndTime(endTime);
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+ groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -118,7 +120,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test4() throws IOException, PathException, StorageEngineException {
+ public void test4() throws IOException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 3;
@@ -133,7 +135,8 @@ public class GroupByEngineDataSetTest {
groupByTimePlan.setSlidingStep(slidingStep);
groupByTimePlan.setStartTime(startTime);
groupByTimePlan.setEndTime(endTime);
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+ groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -146,7 +149,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test5() throws IOException, PathException, StorageEngineException {
+ public void test5() throws IOException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 3;
@@ -163,7 +166,8 @@ public class GroupByEngineDataSetTest {
groupByTimePlan.setEndTime(endTime);
ArrayList<Object> aggrList = new ArrayList<>();
aggrList.add(new CountAggrResult());
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId,
+ groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index a7f920b..d2c4943 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -59,7 +58,7 @@ public class SeriesAggregateReaderTest {
@Before
- public void setUp() throws MetadataException, PathException, IOException, WriteProcessException {
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 03bf2bf..b879685 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -52,7 +51,7 @@ public class SeriesReaderByTimestampTest {
private List<TsFileResource> unseqResources = new ArrayList<>();
@Before
- public void setUp() throws MetadataException, PathException, IOException, WriteProcessException {
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index 41bca61..861ef47 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.query.reader.series;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -57,7 +56,7 @@ public class SeriesReaderTest {
@Before
- public void setUp() throws MetadataException, PathException, IOException, WriteProcessException {
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 58177fd..5c5d462 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -60,7 +59,7 @@ public class SeriesReaderTestUtil {
public static void setUp(List<MeasurementSchema> measurementSchemas, List<String> deviceIds,
List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
- throws MetadataException, PathException, IOException, WriteProcessException {
+ throws MetadataException, IOException, WriteProcessException {
MManager.getInstance().init();
prepareSeries(measurementSchemas, deviceIds);
prepareFiles(seqResources, unseqResources, measurementSchemas, deviceIds);
@@ -150,7 +149,7 @@ public class SeriesReaderTestUtil {
private static void prepareSeries(List<MeasurementSchema> measurementSchemas,
- List<String> deviceIds) throws MetadataException, PathException {
+ List<String> deviceIds) throws MetadataException {
for (int i = 0; i < measurementNum; i++) {
measurementSchemas.add(new MeasurementSchema("sensor" + i, TSDataType.INT32,
encoding, CompressionType.UNCOMPRESSED));
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 08aa3ad..d9d27f8 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -43,6 +43,8 @@ public enum TSStatusCode {
TSFILE_PROCESSOR_ERROR(314),
PATH_ILLEGAL(315),
LOAD_FILE_ERROR(316),
+ INDEX_ERROR(317),
+ UNSUPPORTED_INDEX_TYPE_ERROR(318),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index acec7b4..114707d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -165,6 +165,31 @@ public abstract class Statistics<T> {
}
}
+ public void update(long time, Object value, TSDataType type) {
+ switch (type) {
+ case INT32:
+ update(time, (Integer) value);
+ break;
+ case INT64:
+ update(time, (Long) value);
+ break;
+ case TEXT:
+ update(time, (Binary) value);
+ break;
+ case BOOLEAN:
+ update(time, (Boolean) value);
+ break;
+ case DOUBLE:
+ update(time, (Double) value);
+ break;
+ case FLOAT:
+ update(time, (Float) value);
+ break;
+ default:
+ throw new UnknownColumnTypeException(type.toString());
+ }
+ }
+
public void update(long time, boolean value) {
if (time < this.startTime) {
startTime = time;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
index 307c694..ba93419 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
@@ -70,7 +70,7 @@ public class TimeFilter {
public static class TimeIn extends In {
private TimeIn(Set<Long> values, boolean not) {
- super(values, FilterType.TIME_FILTER,not);
+ super(values, FilterType.TIME_FILTER, not);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 848e0b3..05ad9db 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -25,8 +25,10 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
@@ -44,12 +46,15 @@ import org.apache.iotdb.tsfile.utils.StringContainer;
*/
public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
+ private static final long serialVersionUID = -2954441882827172377L;
+
private String measurementId;
private TSDataType type;
private TSEncoding encoding;
private TSEncodingBuilder encodingConverter;
private CompressionType compressor;
private Map<String, String> props = new HashMap<>();
+ private Set<String> indexSet;
public MeasurementSchema() {
}
@@ -88,6 +93,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
this.encoding = encoding;
this.props = props == null ? Collections.emptyMap() : props;
this.compressor = compressionType;
+ this.indexSet = new HashSet<>();
}
/**
@@ -172,6 +178,18 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
this.props = props;
}
+ public boolean isIndexed(String indexType) {
+ return indexSet.contains(indexType);
+ }
+
+ public void setIndex(String indexType, boolean isIndexed) {
+ if (isIndexed) {
+ indexSet.add(indexType);
+ } else {
+ indexSet.remove(indexType);
+ }
+ }
+
/**
* function for getting time encoder.
*/
@@ -183,6 +201,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
/**
* get Encoder of value from encodingConverter by measurementID and data type.
+ *
* @return Encoder for value
*/
public Encoder getValueEncoder() {
@@ -293,5 +312,4 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
sc.addTail("]");
return sc.toString();
}
-
}