You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2022/02/14 04:06:38 UTC
[iotdb] 04/32: unify
This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f19b2e0ff887eb7be99597d506c30391042a0bfe
Author: Lei Rui <10...@qq.com>
AuthorDate: Wed Feb 2 20:39:24 2022 +0800
unify
---
.../query/groupby/MergeGroupByExecutor.java | 2 +-
.../query/groupby/RemoteGroupByExecutor.java | 2 +-
.../iotdb/cluster/query/reader/EmptyReader.java | 2 +-
example/session/pom.xml | 12 +-
.../iotdb/cpv/QueryFullGameExperimentCPV.java | 60 ------
.../macUDF/QueryFullGameExperimentMacUDF.java | 43 -----
.../iotdb/moc/QueryFullGameExperimentMOC.java | 57 ------
.../iotdb/queryExp/QueryFullGameExperiment.java | 97 ++++++++++
.../apache/iotdb/queryExp/QuerySyntheticData1.java | 111 +++++++++++
.../{ => writeData}/WriteFullGameToIoTDB.java | 2 +-
.../iotdb/writeData/WriteSyntheticData1.java | 47 +++++
.../resources/conf/iotdb-engine.properties | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +
.../db/query/dataset/groupby/GroupByExecutor.java | 2 +-
.../groupby/GroupByWithoutValueFilterDataSet.java | 17 +-
.../dataset/groupby/LocalGroupByExecutor.java | 6 +-
.../dataset/groupby/LocalGroupByExecutor4CPV.java | 4 +-
.../iotdb/db/query/udf/builtin/UDTFM4MAC.java | 1 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 8 +-
.../m4/{MyCPVTest1.java => MyTest1.java} | 41 ++--
.../m4/{MyCPVTest2.java => MyTest2.java} | 71 ++++---
.../apache/iotdb/db/integration/m4/MyTest3.java | 212 +++++++++++++++++++++
.../iotdb/tsfile/read/reader/page/PageReader.java | 6 +-
24 files changed, 588 insertions(+), 234 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutor.java
index f99a65f..e260113 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutor.java
@@ -85,7 +85,7 @@ public class MergeGroupByExecutor implements GroupByExecutor {
}
@Override
- public List<AggregateResult> calcResult4CPV(
+ public List<AggregateResult> calcResult(
long curStartTime, long curEndTime, long startTime, long endTime, long interval)
throws IOException, QueryProcessException {
throw new IOException("no implemented");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index dfac66d..2421cc3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -123,7 +123,7 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
}
@Override
- public List<AggregateResult> calcResult4CPV(
+ public List<AggregateResult> calcResult(
long curStartTime, long curEndTime, long startTime, long endTime, long interval)
throws IOException, QueryProcessException {
throw new IOException("no implemented");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index fdce7cf..e61186e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -47,7 +47,7 @@ public class EmptyReader extends BaseManagedSeriesReader
private List<AggregateResult> aggregationResults = new ArrayList<>();
@Override
- public List<AggregateResult> calcResult4CPV(
+ public List<AggregateResult> calcResult(
long curStartTime, long curEndTime, long startTime, long endTime, long interval)
throws IOException, QueryProcessException {
throw new IOException("no implemented");
diff --git a/example/session/pom.xml b/example/session/pom.xml
index a3ce2df..1b2f43f 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -26,8 +26,8 @@
<version>0.12.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>QueryFullGameExperiment</artifactId>
- <name>QueryFullGameExperiment</name>
+ <artifactId>QuerySyntheticData1</artifactId>
+ <name>QuerySyntheticData1</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
@@ -39,6 +39,12 @@
<artifactId>iotdb-session</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>0.12.4</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -55,7 +61,7 @@
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.iotdb.cpv.QueryFullGameExperimentCPV</mainClass>
+ <mainClass>org.apache.iotdb.queryExp.QuerySyntheticData1</mainClass>
</transformer>
</transformers>
</configuration>
diff --git a/example/session/src/main/java/org/apache/iotdb/cpv/QueryFullGameExperimentCPV.java b/example/session/src/main/java/org/apache/iotdb/cpv/QueryFullGameExperimentCPV.java
deleted file mode 100644
index 7bb65cf..0000000
--- a/example/session/src/main/java/org/apache/iotdb/cpv/QueryFullGameExperimentCPV.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.iotdb.cpv;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-import org.apache.thrift.TException;
-
-public class QueryFullGameExperimentCPV {
-
- private static final String queryFormat =
- "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
- + "from %s "
- + "group by ([%d, %d), %dns)";
- // * (1) min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s)
- // => Don't change the sequence of the above six aggregates!
- // * (2) group by ([tqs,tqe),IntervalLength) => Make sure (tqe-tqs) is divisible by
- // IntervalLength!
-
- public static Session session;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, TException {
- int intervalNum = Integer.parseInt(args[0]);
- String measurement = "s6";
- String device = "root.game";
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
- SessionDataSet dataSet;
- long minTime = 0L;
- long maxTime = 4264605928301L;
- long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
- maxTime = minTime + interval * intervalNum;
- String sql =
- String.format(
- queryFormat,
- measurement,
- measurement,
- measurement,
- measurement,
- measurement,
- measurement,
- device,
- minTime,
- maxTime,
- interval);
- dataSet = session.executeQueryStatement(sql);
- while (dataSet.hasNext()) {
- RowRecord r = dataSet.next();
- }
- session.executeNonQueryStatement("clear cache");
- dataSet = session.executeFinish();
- String info = dataSet.getFinishResult();
- System.out.println(info);
- dataSet.closeOperationHandle();
- session.close();
- }
-}
diff --git a/example/session/src/main/java/org/apache/iotdb/macUDF/QueryFullGameExperimentMacUDF.java b/example/session/src/main/java/org/apache/iotdb/macUDF/QueryFullGameExperimentMacUDF.java
deleted file mode 100644
index ff52b6c..0000000
--- a/example/session/src/main/java/org/apache/iotdb/macUDF/QueryFullGameExperimentMacUDF.java
+++ /dev/null
@@ -1,43 +0,0 @@
-// package org.apache.iotdb.macUDF;
-//
-// import org.apache.iotdb.rpc.IoTDBConnectionException;
-// import org.apache.iotdb.rpc.StatementExecutionException;
-// import org.apache.iotdb.session.Session;
-// import org.apache.iotdb.session.SessionDataSet;
-// import org.apache.iotdb.tsfile.read.common.RowRecord;
-//
-// import org.apache.thrift.TException;
-//
-// public class QueryFullGameExperimentMacUDF {
-//
-// private static final String queryFormat =
-// "select M4(%1$s,'tqs'='%3$d','tqe'='%4$d','w'='%5$d') from %2$s where time>=%3$d and
-// time<%4$d";
-//
-// public static Session session;
-//
-// public static void main(String[] args)
-// throws IoTDBConnectionException, StatementExecutionException, TException {
-// int intervalNum = Integer.parseInt(args[0]);
-// String measurement = "s6";
-// String device = "root.game";
-// session = new Session("127.0.0.1", 6667, "root", "root");
-// session.open(false);
-// SessionDataSet dataSet;
-// long minTime = 0L;
-// long maxTime = 4264605928301L;
-// long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
-// maxTime = minTime + interval * intervalNum;
-// String sql = String.format(queryFormat, measurement, device, minTime, maxTime, intervalNum);
-// dataSet = session.executeQueryStatement(sql);
-// while (dataSet.hasNext()) {
-// RowRecord r = dataSet.next();
-// }
-// session.executeNonQueryStatement("clear cache");
-// dataSet = session.executeFinish();
-// String info = dataSet.getFinishResult();
-// System.out.println(info);
-// dataSet.closeOperationHandle();
-// session.close();
-// }
-// }
diff --git a/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java b/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java
deleted file mode 100644
index 7203eef..0000000
--- a/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java
+++ /dev/null
@@ -1,57 +0,0 @@
-// package org.apache.iotdb.moc;
-//
-// import org.apache.iotdb.rpc.IoTDBConnectionException;
-// import org.apache.iotdb.rpc.StatementExecutionException;
-// import org.apache.iotdb.session.Session;
-// import org.apache.iotdb.session.SessionDataSet;
-// import org.apache.iotdb.tsfile.read.common.RowRecord;
-//
-// import org.apache.thrift.TException;
-//
-// public class QueryFullGameExperimentMOC {
-//
-// private static final String queryFormat =
-// "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s),
-// max_value(%s) "
-// + "from %s "
-// + "group by ([%d, %d), %dns)";
-//
-// public static Session session;
-//
-// public static void main(String[] args)
-// throws IoTDBConnectionException, StatementExecutionException, TException {
-// int intervalNum = Integer.parseInt(args[0]);
-// String measurement = "s6";
-// String device = "root.game";
-// session = new Session("127.0.0.1", 6667, "root", "root");
-// session.open(false);
-// SessionDataSet dataSet;
-// long minTime = 0L;
-// long maxTime = 4264605928301L;
-// long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
-// maxTime = minTime + interval * intervalNum;
-// String sql =
-// String.format(
-// queryFormat,
-// measurement,
-// measurement,
-// measurement,
-// measurement,
-// measurement,
-// measurement,
-// device,
-// minTime,
-// maxTime,
-// interval);
-// dataSet = session.executeQueryStatement(sql);
-// while (dataSet.hasNext()) {
-// RowRecord r = dataSet.next();
-// }
-// session.executeNonQueryStatement("clear cache");
-// dataSet = session.executeFinish();
-// String info = dataSet.getFinishResult();
-// System.out.println(info);
-// dataSet.closeOperationHandle();
-// session.close();
-// }
-// }
diff --git a/example/session/src/main/java/org/apache/iotdb/queryExp/QueryFullGameExperiment.java b/example/session/src/main/java/org/apache/iotdb/queryExp/QueryFullGameExperiment.java
new file mode 100644
index 0000000..1a97ebf
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/queryExp/QueryFullGameExperiment.java
@@ -0,0 +1,97 @@
+package org.apache.iotdb.queryExp;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.thrift.TException;
+
+public class QueryFullGameExperiment {
+
+ // * (1) min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s)
+ // => Don't change the sequence of the above six aggregates!
+ // * (2) group by ([tqs,tqe),IntervalLength) => Make sure (tqe-tqs) is divisible by
+ // IntervalLength!
+ // * (3) NOTE the time unit of interval. Update for different datasets!!!!!!!!!!!
+ private static final String queryFormat =
+ "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+ + "from %s "
+ + "group by ([%d, %d), %dns)";
+
+ private static final String queryFormat_UDF =
+ "select M4(%1$s,'tqs'='%3$d','tqe'='%4$d','w'='%5$d') from %2$s where time>=%3$d and time<%4$d";
+
+ public static Session session;
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException, TException {
+ // fix parameters for synthetic data1
+ String measurement = "s6";
+ String device = "root.game";
+ // fixed query total range
+ long minTime = 0L;
+ long maxTime = 4264605928301L; // unit:ns. Set in iotdb-engine.properties `timestamp_precision`.
+ // 实验自变量1:w数量
+ int intervalNum = Integer.parseInt(args[0]);
+ // 实验自变量2:方法
+ // 1: MAC, 2: MOC, 3: CPV
+ int approach = Integer.parseInt(args[1]);
+ if (approach != 1 && approach != 2 && approach != 3) {
+ throw new TException("Wrong input parameter approach!");
+ }
+ if (approach != 1) {
+ // MOC and CPV sql are the same sql: queryFormat.
+ // Set the server parameter in iotdb-engine.properties: enable_CPV=true for CPV, false for
+ // MOC.
+ if (approach == 2) { // MOC
+ System.out.println(
+ "MAKE SURE you have set the enable_CPV as false in `iotdb-engine.properties` for MOC!");
+ } else { // CPV
+ System.out.println(
+ "MAKE SURE you have set the enable_CPV as true in `iotdb-engine.properties` for CPV!");
+ }
+ }
+
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open(false);
+ session.setFetchSize(100000); // this is important. Set it big to avoid multiple fetch.
+
+ long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
+ maxTime = minTime + interval * intervalNum;
+
+ String sql;
+ if (approach == 1) { // MAC UDF
+ sql =
+ String.format(queryFormat_UDF, measurement, device, minTime, maxTime, intervalNum); // MAC
+ } else {
+ // MOC and CPV sql use the same sql queryFormat.
+ sql =
+ String.format(
+ queryFormat,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ device,
+ minTime,
+ maxTime,
+ interval);
+ }
+
+ SessionDataSet dataSet;
+ dataSet = session.executeQueryStatement(sql);
+ while (dataSet.hasNext()) {
+ RowRecord r = dataSet.next();
+ }
+ session.executeNonQueryStatement("clear cache");
+ dataSet = session.executeFinish();
+ String info = dataSet.getFinishResult();
+ System.out.println(info);
+ dataSet.closeOperationHandle();
+ session.close();
+ }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1.java b/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1.java
new file mode 100644
index 0000000..a444313
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/queryExp/QuerySyntheticData1.java
@@ -0,0 +1,111 @@
+package org.apache.iotdb.queryExp;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.thrift.TException;
+
+/**
+ * !!!!!!!Before query data, make sure check the following server parameters:
+ *
+ * <p>system_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/system
+ * data_dirs=/data3/ruilei/iotdb-server-0.12.4/synData1/data
+ * wal_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/wal timestamp_precision=ms
+ * unseq_tsfile_size=1073741824 # maximum size of unseq TsFile is 1024^3 Bytes
+ * seq_tsfile_size=1073741824 # maximum size of seq TsFile is 1024^3 Bytes
+ * avg_series_point_number_threshold=10000 # each chunk contains 10000 data points
+ * compaction_strategy=NO_COMPACTION # compaction between levels is disabled
+ * enable_unseq_compaction=false # unseq compaction is disabled
+ */
+public class QuerySyntheticData1 {
+
+ // * (1) min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s)
+ // => Don't change the sequence of the above six aggregates!
+ // * (2) group by ([tqs,tqe),IntervalLength) => Make sure (tqe-tqs) is divisible by
+ // IntervalLength!
+ // * (3) NOTE the time unit of interval. Update for different datasets!!!!!!!!!!!
+ private static final String queryFormat =
+ "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+ + "from %s "
+ + "group by ([%d, %d), %dms)";
+
+ private static final String queryFormat_UDF =
+ "select M4(%1$s,'tqs'='%3$d','tqe'='%4$d','w'='%5$d') from %2$s where time>=%3$d and time<%4$d";
+
+ public static Session session;
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException, TException {
+ // fix parameters for synthetic data1
+ String measurement = "s0";
+ String device = "root.vehicle.d0";
+ // fixed query total range
+ long minTime = 0L;
+ long maxTime = 10000000L; // unit:ms. Set in iotdb-engine.properties `timestamp_precision`.
+ // 实验自变量1:w数量
+ // int intervalNum = Integer.parseInt(args[0]);
+ int intervalNum = 50;
+ // 实验自变量2:方法
+ // 1: MAC, 2: MOC, 3: CPV
+ // int approach = Integer.parseInt(args[1]);
+ int approach = 2;
+ if (approach != 1 && approach != 2 && approach != 3) {
+ throw new TException("Wrong input parameter approach!");
+ }
+ if (approach != 1) {
+ // MOC and CPV sql are the same sql: queryFormat.
+ // Set the server parameter in iotdb-engine.properties: enable_CPV=true for CPV, false for
+ // MOC.
+ if (approach == 2) { // MOC
+ System.out.println(
+ "MAKE SURE you have set the enable_CPV as false in `iotdb-engine.properties` for MOC!");
+ } else { // CPV
+ System.out.println(
+ "MAKE SURE you have set the enable_CPV as true in `iotdb-engine.properties` for CPV!");
+ }
+ }
+
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open(false);
+ session.setFetchSize(100000); // this is important. Set it big to avoid multiple fetch.
+
+ long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
+ maxTime = minTime + interval * intervalNum;
+
+ String sql;
+ if (approach == 1) { // MAC UDF
+ sql =
+ String.format(queryFormat_UDF, measurement, device, minTime, maxTime, intervalNum); // MAC
+ } else {
+ // MOC and CPV sql use the same sql queryFormat.
+ sql =
+ String.format(
+ queryFormat,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ measurement,
+ device,
+ minTime,
+ maxTime,
+ interval);
+ }
+
+ SessionDataSet dataSet;
+ dataSet = session.executeQueryStatement(sql);
+ while (dataSet.hasNext()) {
+ RowRecord r = dataSet.next();
+ }
+ session.executeNonQueryStatement("clear cache");
+ dataSet = session.executeFinish();
+ String info = dataSet.getFinishResult();
+ System.out.println(info);
+ dataSet.closeOperationHandle();
+ session.close();
+ }
+}
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteFullGameToIoTDB.java b/example/session/src/main/java/org/apache/iotdb/writeData/WriteFullGameToIoTDB.java
similarity index 97%
rename from example/session/src/main/java/org/apache/iotdb/WriteFullGameToIoTDB.java
rename to example/session/src/main/java/org/apache/iotdb/writeData/WriteFullGameToIoTDB.java
index 263abe9..8880ab7 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteFullGameToIoTDB.java
+++ b/example/session/src/main/java/org/apache/iotdb/writeData/WriteFullGameToIoTDB.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb;
+package org.apache.iotdb.writeData;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
diff --git a/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
new file mode 100644
index 0000000..93d8bf5
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/writeData/WriteSyntheticData1.java
@@ -0,0 +1,47 @@
+package org.apache.iotdb.writeData;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class WriteSyntheticData1 {
+
+ /**
+ * !!!!!!!Before writing data, make sure check the following server parameters:
+ *
+ * <p>system_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/system
+ * data_dirs=/data3/ruilei/iotdb-server-0.12.4/synData1/data
+ * wal_dir=/data3/ruilei/iotdb-server-0.12.4/synData1/wal timestamp_precision=ms
+ * unseq_tsfile_size=1073741824 # maximum size of unseq TsFile is 1024^3 Bytes
+ * seq_tsfile_size=1073741824 # maximum size of seq TsFile is 1024^3 Bytes
+ * avg_series_point_number_threshold=10000 # each chunk contains 10000 data points
+ * compaction_strategy=NO_COMPACTION # compaction between levels is disabled
+ * enable_unseq_compaction=false # unseq compaction is disabled
+ */
+ public static final String device = "root.vehicle.d0";
+
+ public static final String measurements = "s0";
+
+ public static void main(String[] args)
+ throws IOException, IoTDBConnectionException, StatementExecutionException {
+
+ Session session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open(false);
+
+ for (long timestamp = 1; timestamp <= 10000000; timestamp++) {
+ double value = Math.random();
+ session.insertRecord(
+ device,
+ timestamp,
+ Collections.singletonList(measurements),
+ Collections.singletonList(TSDataType.DOUBLE),
+ value);
+ }
+ session.executeNonQueryStatement("flush");
+ session.close();
+ }
+}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 28a3195..7580172 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -18,6 +18,11 @@
#
####################
+### enable CPV
+####################
+enable_CPV=false
+
+####################
### Web Page Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 756b88d..725c38d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -431,6 +431,8 @@ public class IoTDBConfig {
/** Is performance tracing enable. */
private boolean enablePerformanceTracing = false;
+ private boolean enableCPV = false;
+
/** The display of stat performance interval in ms. */
private long performanceStatDisplayInterval = 60000;
@@ -1380,10 +1382,18 @@ public class IoTDBConfig {
return enablePerformanceTracing;
}
+ public boolean isEnableCPV() {
+ return enableCPV;
+ }
+
public void setEnablePerformanceTracing(boolean enablePerformanceTracing) {
this.enablePerformanceTracing = enablePerformanceTracing;
}
+ public void setEnableCPV(boolean enableCPV) {
+ this.enableCPV = enableCPV;
+ }
+
public long getPerformanceStatDisplayInterval() {
return performanceStatDisplayInterval;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8df773d..2963fba 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -535,6 +535,10 @@ public class IoTDBDescriptor {
Boolean.toString(conf.isEnablePerformanceTracing()))
.trim()));
+ conf.setEnableCPV(
+ Boolean.parseBoolean(
+ properties.getProperty("enable_CPV", Boolean.toString(conf.isEnableCPV())).trim()));
+
conf.setPerformanceStatDisplayInterval(
Long.parseLong(
properties
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
index 86cb260..85f7e02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
@@ -36,7 +36,7 @@ public interface GroupByExecutor {
List<AggregateResult> calcResult(long curStartTime, long curEndTime)
throws IOException, QueryProcessException;
- List<AggregateResult> calcResult4CPV(
+ List<AggregateResult> calcResult(
long curStartTime, long curEndTime, long startTime, long endTime, long interval)
throws IOException, QueryProcessException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 085d6ad..bd4e28e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.query.dataset.groupby;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -54,6 +56,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
private static final Logger logger =
LoggerFactory.getLogger(GroupByWithoutValueFilterDataSet.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
private Map<PartialPath, GroupByExecutor> pathExecutors = new HashMap<>();
/**
@@ -145,7 +149,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
GroupByExecutor executor = pathToExecutorEntry.getValue();
// long start = System.nanoTime();
List<AggregateResult> aggregations =
- executor.calcResult4CPV(curStartTime, curEndTime, startTime, endTime, interval);
+ executor.calcResult(curStartTime, curEndTime, startTime, endTime, interval);
// IOMonitor.incTotalTime(System.nanoTime() - start);
for (int i = 0; i < aggregations.size(); i++) {
int resultIndex = resultIndexes.get(pathToExecutorEntry.getKey()).get(i);
@@ -193,7 +197,14 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
TsFileFilter fileFilter,
boolean ascending)
throws StorageEngineException, QueryProcessException {
- return new LocalGroupByExecutor4CPV(
- path, allSensors, dataType, context, timeFilter, fileFilter, ascending);
+ if (CONFIG.isEnableCPV()) {
+ System.out.println("[[[[[M4]]]]] use LocalGroupByExecutor4CPV for CPV");
+ return new LocalGroupByExecutor4CPV(
+ path, allSensors, dataType, context, timeFilter, fileFilter, ascending);
+ } else {
+ System.out.println("[[[[[M4]]]]] use LocalGroupByExecutor for MOC");
+ return new LocalGroupByExecutor(
+ path, allSensors, dataType, context, timeFilter, fileFilter, ascending);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index 4403717..8e5021b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -184,14 +184,14 @@ public class LocalGroupByExecutor implements GroupByExecutor {
}
@Override
- public List<AggregateResult> calcResult4CPV(
- long curStartTime, long curEndTime, long startTime, long endTime, long interval)
+ public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
throws IOException, QueryProcessException {
throw new IOException("no implemented");
}
@Override
- public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+ public List<AggregateResult> calcResult(
+ long curStartTime, long curEndTime, long startTime, long endTime, long interval)
throws IOException, QueryProcessException {
// clear result cache
for (AggregateResult result : results) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index 1fa546c..157435a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -132,7 +132,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
* @param endTime open
*/
@Override
- public List<AggregateResult> calcResult4CPV(
+ public List<AggregateResult> calcResult(
long curStartTime, long curEndTime, long startTime, long endTime, long interval)
throws IOException, QueryProcessException {
// System.out.println("====DEBUG====: calcResult for [" + curStartTime + "," + curEndTime +
@@ -348,7 +348,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
// + lastChunkMetadata.getVersion() + " " + lastChunkMetadata
// .getOffsetOfChunkHeader());
- currentChunkList.remove(listIdx[0]);
+ currentChunkList.remove(listIdx[1]);
List<IPageReader> pageReaderList =
FileLoaderUtils.loadPageReaderList(lastChunkMetadata, this.timeFilter);
for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
index 8d7207d..284ada3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
@@ -127,6 +127,7 @@ public class UDTFM4MAC implements UDTF {
for (int i = 0; i < w; i++) {
result[i] = "empty";
}
+ System.out.println("[[[[[M4]]]]] use UDTFM4MAC for MAC");
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index e19063f..a3b1124 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -29,7 +29,11 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -633,7 +637,7 @@ public class TSServiceImpl implements TSIService.Iface {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
-
+ System.out.println("[[[[[M4]]]]] fetchSize=" + req.fetchSize);
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
statement,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java
similarity index 96%
rename from server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest1.java
rename to server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java
index cc0c264..fa4001f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest1.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java
@@ -19,14 +19,15 @@
package org.apache.iotdb.db.integration.m4;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
@@ -37,7 +38,7 @@ import java.util.Locale;
import static org.junit.Assert.fail;
-public class MyCPVTest1 {
+public class MyTest1 {
private static final String TIMESTAMP_STR = "Time";
@@ -52,21 +53,28 @@ public class MyCPVTest1 {
private static final String insertTemplate =
"INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
- @BeforeClass
- public static void setUp() throws Exception {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static boolean originalEnableCPV;
+ private static CompactionStrategy originalCompactionStrategy;
+
+ @Before
+ public void setUp() throws Exception {
+ originalCompactionStrategy = config.getCompactionStrategy();
+ config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+ originalEnableCPV = config.isEnableCPV();
+ // config.setEnableCPV(false); // MOC
+ config.setEnableCPV(true); // CPV
+
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ config.setCompactionStrategy(originalCompactionStrategy);
+ config.setEnableCPV(originalEnableCPV);
}
@Test
@@ -86,8 +94,7 @@ public class MyCPVTest1 {
boolean hasResultSet =
statement.execute(
"SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
- + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the
- // sequence!!!
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)");
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
@@ -167,7 +174,7 @@ public class MyCPVTest1 {
String[] res =
new String[] {
"0,1,20,5,20,5[1],30[10]",
- "25,25,27,8,20,20[27],20[27]",
+ "25,25,27,8,20,8[25],20[27]",
"50,null,null,null,null,null,null",
"75,null,null,null,null,null,null"
};
@@ -261,7 +268,7 @@ public class MyCPVTest1 {
String[] res =
new String[] {
"0,1,20,5,20,5[1],30[10]",
- "25,25,27,8,20,20[27],20[27]",
+ "25,25,27,8,20,8[25],20[27]",
"50,null,null,null,null,null,null",
"75,null,null,null,null,null,null",
"100,120,120,8,8,8[120],8[120]",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest2.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest2.java
similarity index 91%
rename from server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest2.java
rename to server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest2.java
index 60d25f9..13c9e62 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest2.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest2.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
@@ -37,7 +37,7 @@ import java.util.Locale;
import static org.junit.Assert.fail;
-public class MyCPVTest2 {
+public class MyTest2 {
private static final String TIMESTAMP_STR = "Time";
@@ -52,26 +52,43 @@ public class MyCPVTest2 {
private static final String insertTemplate =
"INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
- private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
- private static int avgSeriesPointNumberThreshold;
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static boolean originalEnableCPV;
+ private static CompactionStrategy originalCompactionStrategy;
+ private static int originalAvgSeriesPointNumberThreshold;
+ private static long originalSeqTsFileSize;
+ private static long originalUnSeqTsFileSize;
- @BeforeClass
- public static void setUp() throws Exception {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
- avgSeriesPointNumberThreshold = ioTDBConfig.getAvgSeriesPointNumberThreshold();
+ @Before
+ public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
+
+ originalEnableCPV = config.isEnableCPV();
+ originalCompactionStrategy = config.getCompactionStrategy();
+ originalAvgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
+ originalSeqTsFileSize = config.getSeqTsFileSize();
+ originalUnSeqTsFileSize = config.getUnSeqTsFileSize();
+
+ config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+ config.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
+ config.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
+ config.setAvgSeriesPointNumberThreshold(4); // this step cannot be omitted
+
+ config.setEnableCPV(
+ true); // this test cannot be false, as the expected answer for bottomTime and topTime can
+ // be different
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
- ioTDBConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
+ config.setCompactionStrategy(originalCompactionStrategy);
+ config.setAvgSeriesPointNumberThreshold(originalAvgSeriesPointNumberThreshold);
+ config.setEnableCPV(originalEnableCPV);
+ config.setSeqTsFileSize(originalSeqTsFileSize);
+ config.setUnSeqTsFileSize(originalUnSeqTsFileSize);
}
@Test
@@ -134,10 +151,6 @@ public class MyCPVTest2 {
statement.execute(sql);
}
- ioTDBConfig.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setAvgSeriesPointNumberThreshold(4); // this step cannot be omitted
-
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1));
@@ -225,10 +238,6 @@ public class MyCPVTest2 {
statement.execute(sql);
}
- ioTDBConfig.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setAvgSeriesPointNumberThreshold(4); // this step cannot be omitted
-
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 5));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 5));
@@ -316,10 +325,6 @@ public class MyCPVTest2 {
statement.execute(sql);
}
- ioTDBConfig.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setAvgSeriesPointNumberThreshold(4); // this step cannot be omitted
-
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 5));
@@ -401,10 +406,6 @@ public class MyCPVTest2 {
statement.execute(sql);
}
- ioTDBConfig.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setAvgSeriesPointNumberThreshold(4); // this step cannot be omitted
-
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 1));
@@ -481,10 +482,6 @@ public class MyCPVTest2 {
statement.execute(sql);
}
- ioTDBConfig.setSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
- ioTDBConfig.setAvgSeriesPointNumberThreshold(4); // this step cannot be omitted
-
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 1));
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest3.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest3.java
new file mode 100644
index 0000000..d09b8d9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest3.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.m4;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyTest3 {
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ private static String[] creationSqls =
+ new String[] {
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=DOUBLE",
+ };
+
+ private final String d0s0 = "root.vehicle.d0.s0";
+
+ private static final String insertTemplate =
+ "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static boolean originalEnableCPV;
+ private static CompactionStrategy originalCompactionStrategy;
+
+ @Before
+ public void setUp() throws Exception {
+ originalCompactionStrategy = config.getCompactionStrategy();
+ config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+ originalEnableCPV = config.isEnableCPV();
+ // config.setEnableCPV(false); // MOC
+ config.setEnableCPV(true); // CPV
+
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ config.setCompactionStrategy(originalCompactionStrategy);
+ config.setEnableCPV(originalEnableCPV);
+ }
+
+ @Test
+ public void test1() {
+ prepareData1();
+
+ String[] res =
+ new String[] {
+ "0,1,20,5.0,20.0,5.0[1],30.0[10]",
+ "25,25,45,8.0,30.0,8.0[25],40.0[30]",
+ "50,52,54,8.0,18.0,8.0[52],18.0[54]",
+ "75,null,null,null,null,null,null"
+ };
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData1() {
+ // data:
+ // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 20));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 27, 20));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 40));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 35, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 20));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 33, 9));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 45, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 52, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 18));
+ statement.execute("FLUSH");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test1_2() { // test UDF MAC
+ prepareData1();
+
+ String[] res =
+ new String[] {
+ "0,FirstPoint=(1,5.0), LastPoint=(20,20.0), BottomPoint=(1,5.0), TopPoint=(10,30.0)",
+ "25,FirstPoint=(25,8.0), LastPoint=(45,30.0), BottomPoint=(25,8.0), TopPoint=(30,40.0)",
+ "50,FirstPoint=(52,8.0), LastPoint=(54,18.0), BottomPoint=(52,8.0), TopPoint=(54,18.0)",
+ "75,empty"
+ };
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ long tqs = 0L;
+ long tqe = 100L;
+ int w = 4;
+ boolean hasResultSet =
+ statement.execute(
+ String.format(
+ "select M4(s0,'tqs'='%1$d','tqe'='%2$d','w'='%3$d') from root.vehicle.d0 where "
+ + "time>=%1$d and time<%2$d",
+ tqs, tqe, w));
+
+ String columnName = "M4(root.vehicle.d0.s0, \"tqs\"=\"%d\", \"tqe\"=\"%d\", \"w\"=\"%d\")";
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format(columnName, tqs, tqe, w));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 237e1e8..a3bfab0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -214,10 +214,12 @@ public class PageReader implements IPageReader {
int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval);
int num = (int) Math.floor((endTime - startTime) * 1.0 / interval);
for (int i = 0; i < num; i++) {
- if (splitBatchDataMap.containsKey(i) && i == curIdx) {
+ if (splitBatchDataMap.containsKey(i) && i == curIdx && !splitBatchDataMap.get(i).isEmpty()) {
currentChunkList.add(
new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip()));
- } else if (splitBatchDataMap.containsKey(i) && i != curIdx) {
+ } else if (splitBatchDataMap.containsKey(i)
+ && i != curIdx
+ && !splitBatchDataMap.get(i).isEmpty()) {
futureChunkList.add(
new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip()));
}