You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2022/02/14 04:06:35 UTC
[iotdb] 01/32: init CPV
This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d6be84bc14ff5c02c3b9158b4da09ee372b23e32
Author: Lei Rui <10...@qq.com>
AuthorDate: Wed Feb 2 00:50:38 2022 +0800
init CPV
---
.../query/groupby/RemoteGroupByExecutor.java | 7 +
example/session/pom.xml | 2 +-
.../src/main/java/org/apache/iotdb/MyTest.java | 158 ++---
.../QueryFullGameExperimentCPV.java} | 7 +-
.../iotdb/mac/QueryFullGameExperimentMAC.java | 94 +--
.../macUDF/QueryFullGameExperimentMacUDF.java | 84 +--
.../iotdb/moc/QueryFullGameExperimentMOC.java | 112 ++--
.../db/query/dataset/groupby/GroupByExecutor.java | 3 +
.../groupby/GroupByWithoutValueFilterDataSet.java | 30 +-
.../dataset/groupby/LocalGroupByExecutor.java | 6 +
.../dataset/groupby/LocalGroupByExecutor4CPV.java | 702 +++++++++++++++++++++
.../iotdb/db/query/reader/series/SeriesReader.java | 61 ++
.../reader/universal/PriorityMergeReader.java | 28 +
.../apache/iotdb/db/integration/m4/MyCPVTest1.java | 546 ++++++++++++++++
.../m4/{MyTestM4.java => MyCPVTest2.java} | 150 ++---
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 4 +
.../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 82 +++
.../iotdb/tsfile/read/reader/page/PageReader.java | 115 ++++
18 files changed, 1852 insertions(+), 339 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index 3b01e54..e1069d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
import org.apache.iotdb.db.utils.SerializeUtils;
@@ -122,6 +123,12 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
}
@Override
+ public List<AggregateResult> calcResult4CPV(long curStartTime, long curEndTime, long startTime,
+ long endTime, long interval) throws IOException, QueryProcessException {
+ throw new IOException("no implemented");
+ }
+
+ @Override
public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
throws IOException {
ByteBuffer aggrBuffer;
diff --git a/example/session/pom.xml b/example/session/pom.xml
index c5fcb73..a3ce2df 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -55,7 +55,7 @@
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.iotdb.mac.QueryFullGameExperimentMAC</mainClass>
+ <mainClass>org.apache.iotdb.cpv.QueryFullGameExperimentCPV</mainClass>
</transformer>
</transformers>
</configuration>
diff --git a/example/session/src/main/java/org/apache/iotdb/MyTest.java b/example/session/src/main/java/org/apache/iotdb/MyTest.java
index e553be0..b49afd9 100644
--- a/example/session/src/main/java/org/apache/iotdb/MyTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/MyTest.java
@@ -1,4 +1,4 @@
-// package org.apache.iotdb;/*
+//package org.apache.iotdb;/*
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
@@ -17,93 +17,93 @@
// * under the License.
// */
//
-// import org.apache.iotdb.db.conf.IoTDBConfig;
-// import org.apache.iotdb.db.conf.IoTDBDescriptor;
-// import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
-// import org.apache.iotdb.db.utils.EnvironmentUtils;
-// import org.apache.iotdb.jdbc.Config;
-// import org.junit.AfterClass;
-// import org.junit.BeforeClass;
-// import org.junit.Test;
+//import java.sql.Connection;
+//import java.sql.DriverManager;
+//import java.sql.Statement;
+//import java.util.Locale;
+//import org.apache.iotdb.db.conf.IoTDBConfig;
+//import org.apache.iotdb.db.conf.IoTDBDescriptor;
+//import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+//import org.apache.iotdb.db.utils.EnvironmentUtils;
+//import org.apache.iotdb.jdbc.Config;
+//import org.junit.AfterClass;
+//import org.junit.BeforeClass;
+//import org.junit.Test;
//
-// import java.sql.Connection;
-// import java.sql.DriverManager;
-// import java.sql.Statement;
-// import java.util.Locale;
+//public class MyTest {
//
-// public class MyTest {
-// private static String[] creationSqls =
-// new String[]{
-// "SET STORAGE GROUP TO root.vehicle.d0",
-// "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=FLOAT, ENCODING=plain",
-// };
+// private static String[] creationSqls =
+// new String[]{
+// "SET STORAGE GROUP TO root.vehicle.d0",
+// "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=FLOAT, ENCODING=plain",
+// };
//
-// private final String d0s0 = "root.vehicle.d0.s0";
+// private final String d0s0 = "root.vehicle.d0.s0";
//
-// private static final String insertTemplate =
-// "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)";
+// private static final String insertTemplate =
+// "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)";
//
-// private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
-// private static int avgSeriesPointNumberThreshold;
+// private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+// private static int avgSeriesPointNumberThreshold;
//
-// @BeforeClass
-// public static void setUp() throws Exception {
-// avgSeriesPointNumberThreshold = ioTDBConfig.getAvgSeriesPointNumberThreshold();
+// @BeforeClass
+// public static void setUp() throws Exception {
+// avgSeriesPointNumberThreshold = ioTDBConfig.getAvgSeriesPointNumberThreshold();
//
+// IoTDBDescriptor.getInstance().getConfig()
+// .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
//
-// IoTDBDescriptor.getInstance().getConfig().setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+// EnvironmentUtils.envSetUp();
//
-// EnvironmentUtils.envSetUp();
+// Class.forName(Config.JDBC_DRIVER_NAME);
+// }
//
-// Class.forName(Config.JDBC_DRIVER_NAME);
-// }
-//
-// @AfterClass
-// public static void tearDown() throws Exception {
+// @AfterClass
+// public static void tearDown() throws Exception {
//// EnvironmentUtils.cleanEnv();
-// IoTDBDescriptor.getInstance()
-// .getConfig()
-// .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
-//
-// ioTDBConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
-// }
-//
-// @Test
-// public void test1() {
-// prepareData1();
-// }
-//
-// private static void prepareData1() {
-// try (Connection connection =
-// DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
-// Statement statement = connection.createStatement()) {
-//
-// String[] creationSqls =
-// new String[]{
-// "SET STORAGE GROUP TO root.vehicle.d0",
-// "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=FLOAT,
-// ENCODING=plain",
-// };
-// for (String sql : creationSqls) {
-// statement.execute(sql);
-// }
-//
-// String insertTemplate =
-// "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)";
-//
-// ioTDBConfig.setSeqTsFileSize(1024*1024*1024);// 1G
-// ioTDBConfig.setUnSeqTsFileSize(1024*1024*1024); // 1G
-// ioTDBConfig.setAvgSeriesPointNumberThreshold(10000); // this step cannot be omitted
-//
-// for (int i = 1; i <= 50000; i++) {
-// statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i,
-// Math.random()));
-// }
-// statement.executeBatch();
-// statement.clearBatch();
-// statement.execute("flush");
-// } catch (Exception e) {
-// e.printStackTrace();
-// }
+// IoTDBDescriptor.getInstance()
+// .getConfig()
+// .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+//
+// ioTDBConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
+// }
+//
+// @Test
+// public void test1() {
+// prepareData1();
+// }
+//
+// private static void prepareData1() {
+// try (Connection connection =
+// DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+// Statement statement = connection.createStatement()) {
+//
+// String[] creationSqls =
+// new String[]{
+// "SET STORAGE GROUP TO root.vehicle.d0",
+// "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=FLOAT,
+// ENCODING = plain",
+// };
+// for (String sql : creationSqls) {
+// statement.execute(sql);
+// }
+//
+// String insertTemplate =
+// "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)";
+//
+// ioTDBConfig.setSeqTsFileSize(1024 * 1024 * 1024);// 1G
+// ioTDBConfig.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
+// ioTDBConfig.setAvgSeriesPointNumberThreshold(10000); // this step cannot be omitted
+//
+// for (int i = 1; i <= 50000; i++) {
+// statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i,
+// Math.random()));
+// }
+// statement.executeBatch();
+// statement.clearBatch();
+// statement.execute("flush");
+// } catch (Exception e) {
+// e.printStackTrace();
// }
-// }
+// }
+//}
diff --git a/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java b/example/session/src/main/java/org/apache/iotdb/cpv/QueryFullGameExperimentCPV.java
similarity index 83%
copy from example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java
copy to example/session/src/main/java/org/apache/iotdb/cpv/QueryFullGameExperimentCPV.java
index 4926199..0892986 100644
--- a/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java
+++ b/example/session/src/main/java/org/apache/iotdb/cpv/QueryFullGameExperimentCPV.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.moc;
+package org.apache.iotdb.cpv;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -8,12 +8,15 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.thrift.TException;
-public class QueryFullGameExperimentMOC {
+public class QueryFullGameExperimentCPV {
private static final String queryFormat =
"select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+ "from %s "
+ "group by ([%d, %d), %dns)";
+ // * (1) min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s)
+ // => Don't change the sequence of the above six aggregates
+ // * (2) group by ([tqs,tqe),IntervalLength) => Make sure (tqe-tqs) is divisible by IntervalLength.
public static Session session;
diff --git a/example/session/src/main/java/org/apache/iotdb/mac/QueryFullGameExperimentMAC.java b/example/session/src/main/java/org/apache/iotdb/mac/QueryFullGameExperimentMAC.java
index f070d21..bcad979 100644
--- a/example/session/src/main/java/org/apache/iotdb/mac/QueryFullGameExperimentMAC.java
+++ b/example/session/src/main/java/org/apache/iotdb/mac/QueryFullGameExperimentMAC.java
@@ -1,47 +1,47 @@
-package org.apache.iotdb.mac;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-import org.apache.thrift.TException;
-
-public class QueryFullGameExperimentMAC {
-
- private static final String queryFormat =
- "select %s " + "from %s " + "where time >= %d and time < %d";
-
- public static Session session;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, TException {
- int intervalNum = Integer.parseInt(args[0]);
- String measurement = "s6";
- String device = "root.game";
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
- SessionDataSet dataSet;
- long minTime = 0L;
- long maxTime = 4264605928301L;
- long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
- maxTime = minTime + interval * intervalNum;
-
- for (int i = 0; i < intervalNum; i++) {
- long start = i * interval;
- long end = (i + 1) * interval;
- String sql = String.format(queryFormat, measurement, device, start, end);
- dataSet = session.executeQueryStatement(sql);
- while (dataSet.hasNext()) {
- RowRecord r = dataSet.next();
- }
- }
- session.executeNonQueryStatement("clear cache");
- dataSet = session.executeFinish();
- String info = dataSet.getFinishResult();
- System.out.println(info);
- dataSet.closeOperationHandle();
- session.close();
- }
-}
+//package org.apache.iotdb.mac;
+//
+//import org.apache.iotdb.rpc.IoTDBConnectionException;
+//import org.apache.iotdb.rpc.StatementExecutionException;
+//import org.apache.iotdb.session.Session;
+//import org.apache.iotdb.session.SessionDataSet;
+//import org.apache.iotdb.tsfile.read.common.RowRecord;
+//
+//import org.apache.thrift.TException;
+//
+//public class QueryFullGameExperimentMAC {
+//
+// private static final String queryFormat =
+// "select %s " + "from %s " + "where time >= %d and time < %d";
+//
+// public static Session session;
+//
+// public static void main(String[] args)
+// throws IoTDBConnectionException, StatementExecutionException, TException {
+// int intervalNum = Integer.parseInt(args[0]);
+// String measurement = "s6";
+// String device = "root.game";
+// session = new Session("127.0.0.1", 6667, "root", "root");
+// session.open(false);
+// SessionDataSet dataSet;
+// long minTime = 0L;
+// long maxTime = 4264605928301L;
+// long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
+// maxTime = minTime + interval * intervalNum;
+//
+// for (int i = 0; i < intervalNum; i++) {
+// long start = i * interval;
+// long end = (i + 1) * interval;
+// String sql = String.format(queryFormat, measurement, device, start, end);
+// dataSet = session.executeQueryStatement(sql);
+// while (dataSet.hasNext()) {
+// RowRecord r = dataSet.next();
+// }
+// }
+// session.executeNonQueryStatement("clear cache");
+// dataSet = session.executeFinish();
+// String info = dataSet.getFinishResult();
+// System.out.println(info);
+// dataSet.closeOperationHandle();
+// session.close();
+// }
+//}
diff --git a/example/session/src/main/java/org/apache/iotdb/macUDF/QueryFullGameExperimentMacUDF.java b/example/session/src/main/java/org/apache/iotdb/macUDF/QueryFullGameExperimentMacUDF.java
index 334e19d..32c9c8b 100644
--- a/example/session/src/main/java/org/apache/iotdb/macUDF/QueryFullGameExperimentMacUDF.java
+++ b/example/session/src/main/java/org/apache/iotdb/macUDF/QueryFullGameExperimentMacUDF.java
@@ -1,42 +1,42 @@
-package org.apache.iotdb.macUDF;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-import org.apache.thrift.TException;
-
-public class QueryFullGameExperimentMacUDF {
-
- private static final String queryFormat =
- "select M4(%1$s,'tqs'='%3$d','tqe'='%4$d','w'='%5$d') from %2$s where time>=%3$d and time<%4$d";
-
- public static Session session;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, TException {
- int intervalNum = Integer.parseInt(args[0]);
- String measurement = "s6";
- String device = "root.game";
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
- SessionDataSet dataSet;
- long minTime = 0L;
- long maxTime = 4264605928301L;
- long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
- maxTime = minTime + interval * intervalNum;
- String sql = String.format(queryFormat, measurement, device, minTime, maxTime, intervalNum);
- dataSet = session.executeQueryStatement(sql);
- while (dataSet.hasNext()) {
- RowRecord r = dataSet.next();
- }
- session.executeNonQueryStatement("clear cache");
- dataSet = session.executeFinish();
- String info = dataSet.getFinishResult();
- System.out.println(info);
- dataSet.closeOperationHandle();
- session.close();
- }
-}
+//package org.apache.iotdb.macUDF;
+//
+//import org.apache.iotdb.rpc.IoTDBConnectionException;
+//import org.apache.iotdb.rpc.StatementExecutionException;
+//import org.apache.iotdb.session.Session;
+//import org.apache.iotdb.session.SessionDataSet;
+//import org.apache.iotdb.tsfile.read.common.RowRecord;
+//
+//import org.apache.thrift.TException;
+//
+//public class QueryFullGameExperimentMacUDF {
+//
+// private static final String queryFormat =
+// "select M4(%1$s,'tqs'='%3$d','tqe'='%4$d','w'='%5$d') from %2$s where time>=%3$d and time<%4$d";
+//
+// public static Session session;
+//
+// public static void main(String[] args)
+// throws IoTDBConnectionException, StatementExecutionException, TException {
+// int intervalNum = Integer.parseInt(args[0]);
+// String measurement = "s6";
+// String device = "root.game";
+// session = new Session("127.0.0.1", 6667, "root", "root");
+// session.open(false);
+// SessionDataSet dataSet;
+// long minTime = 0L;
+// long maxTime = 4264605928301L;
+// long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
+// maxTime = minTime + interval * intervalNum;
+// String sql = String.format(queryFormat, measurement, device, minTime, maxTime, intervalNum);
+// dataSet = session.executeQueryStatement(sql);
+// while (dataSet.hasNext()) {
+// RowRecord r = dataSet.next();
+// }
+// session.executeNonQueryStatement("clear cache");
+// dataSet = session.executeFinish();
+// String info = dataSet.getFinishResult();
+// System.out.println(info);
+// dataSet.closeOperationHandle();
+// session.close();
+// }
+//}
diff --git a/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java b/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java
index 4926199..c08d21d 100644
--- a/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java
+++ b/example/session/src/main/java/org/apache/iotdb/moc/QueryFullGameExperimentMOC.java
@@ -1,56 +1,56 @@
-package org.apache.iotdb.moc;
-
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-import org.apache.thrift.TException;
-
-public class QueryFullGameExperimentMOC {
-
- private static final String queryFormat =
- "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
- + "from %s "
- + "group by ([%d, %d), %dns)";
-
- public static Session session;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, TException {
- int intervalNum = Integer.parseInt(args[0]);
- String measurement = "s6";
- String device = "root.game";
- session = new Session("127.0.0.1", 6667, "root", "root");
- session.open(false);
- SessionDataSet dataSet;
- long minTime = 0L;
- long maxTime = 4264605928301L;
- long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
- maxTime = minTime + interval * intervalNum;
- String sql =
- String.format(
- queryFormat,
- measurement,
- measurement,
- measurement,
- measurement,
- measurement,
- measurement,
- device,
- minTime,
- maxTime,
- interval);
- dataSet = session.executeQueryStatement(sql);
- while (dataSet.hasNext()) {
- RowRecord r = dataSet.next();
- }
- session.executeNonQueryStatement("clear cache");
- dataSet = session.executeFinish();
- String info = dataSet.getFinishResult();
- System.out.println(info);
- dataSet.closeOperationHandle();
- session.close();
- }
-}
+//package org.apache.iotdb.moc;
+//
+//import org.apache.iotdb.rpc.IoTDBConnectionException;
+//import org.apache.iotdb.rpc.StatementExecutionException;
+//import org.apache.iotdb.session.Session;
+//import org.apache.iotdb.session.SessionDataSet;
+//import org.apache.iotdb.tsfile.read.common.RowRecord;
+//
+//import org.apache.thrift.TException;
+//
+//public class QueryFullGameExperimentMOC {
+//
+// private static final String queryFormat =
+// "select min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s) "
+// + "from %s "
+// + "group by ([%d, %d), %dns)";
+//
+// public static Session session;
+//
+// public static void main(String[] args)
+// throws IoTDBConnectionException, StatementExecutionException, TException {
+// int intervalNum = Integer.parseInt(args[0]);
+// String measurement = "s6";
+// String device = "root.game";
+// session = new Session("127.0.0.1", 6667, "root", "root");
+// session.open(false);
+// SessionDataSet dataSet;
+// long minTime = 0L;
+// long maxTime = 4264605928301L;
+// long interval = (long) Math.ceil((double) (maxTime - minTime) / intervalNum);
+// maxTime = minTime + interval * intervalNum;
+// String sql =
+// String.format(
+// queryFormat,
+// measurement,
+// measurement,
+// measurement,
+// measurement,
+// measurement,
+// measurement,
+// device,
+// minTime,
+// maxTime,
+// interval);
+// dataSet = session.executeQueryStatement(sql);
+// while (dataSet.hasNext()) {
+// RowRecord r = dataSet.next();
+// }
+// session.executeNonQueryStatement("clear cache");
+// dataSet = session.executeFinish();
+// String info = dataSet.getFinishResult();
+// System.out.println(info);
+// dataSet.closeOperationHandle();
+// session.close();
+// }
+//}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
index d9db4b3..8e59d40 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
@@ -36,5 +36,8 @@ public interface GroupByExecutor {
List<AggregateResult> calcResult(long curStartTime, long curEndTime)
throws IOException, QueryProcessException;
+ List<AggregateResult> calcResult4CPV(long curStartTime, long curEndTime, long startTime,
+ long endTime, long interval) throws IOException, QueryProcessException;
+
Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) throws IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 5b91858..de1c6d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -19,6 +19,14 @@
package org.apache.iotdb.db.query.dataset.groupby;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -36,19 +44,9 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-
public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
private static final Logger logger =
@@ -67,9 +65,12 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
*/
private Map<PartialPath, List<Integer>> resultIndexes = new HashMap<>();
- public GroupByWithoutValueFilterDataSet() {}
+ public GroupByWithoutValueFilterDataSet() {
+ }
- /** constructor. */
+ /**
+ * constructor.
+ */
public GroupByWithoutValueFilterDataSet(QueryContext context, GroupByTimePlan groupByTimePlan)
throws StorageEngineException, QueryProcessException {
super(context, groupByTimePlan);
@@ -144,7 +145,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) {
GroupByExecutor executor = pathToExecutorEntry.getValue();
// long start = System.nanoTime();
- List<AggregateResult> aggregations = executor.calcResult(curStartTime, curEndTime);
+ List<AggregateResult> aggregations = executor
+ .calcResult4CPV(curStartTime, curEndTime, startTime, endTime, interval);
// IOMonitor.incTotalTime(System.nanoTime() - start);
for (int i = 0; i < aggregations.size(); i++) {
int resultIndex = resultIndexes.get(pathToExecutorEntry.getKey()).get(i);
@@ -192,7 +194,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
TsFileFilter fileFilter,
boolean ascending)
throws StorageEngineException, QueryProcessException {
- return new LocalGroupByExecutor(
+ return new LocalGroupByExecutor4CPV(
path, allSensors, dataType, context, timeFilter, fileFilter, ascending);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index 9b5f970..5670df0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -184,6 +184,12 @@ public class LocalGroupByExecutor implements GroupByExecutor {
}
@Override
+ public List<AggregateResult> calcResult4CPV(long curStartTime, long curEndTime, long startTime,
+ long endTime, long interval) throws IOException, QueryProcessException {
+ throw new IOException("no implemented");
+ }
+
+ @Override
public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
throws IOException, QueryProcessException {
// clear result cache
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
new file mode 100644
index 0000000..22b4836
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+/**
+ * Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)
+ * ROM root.xx group by ([tqs,tqe),IntervalLength).
+ * Requirements:
+ * (1) Don't change the sequence of the above six aggregates
+ * (2) Make sure (tqe-tqs) is divisible by IntervalLength.
+ * (3) Assume each chunk has only one page.
+ */
+public class LocalGroupByExecutor4CPV implements GroupByExecutor {
+
+ // Aggregate result buffer of this path
+ private final List<AggregateResult> results = new ArrayList<>();
+// private final TimeRange timeRange;
+
+ private List<ChunkSuit4CPV> currentChunkList;
+ private final List<ChunkSuit4CPV> futureChunkList = new ArrayList<>();
+
+ private Filter timeFilter;
+
+ private TSDataType tsDataType;
+
+ private PriorityMergeReader mergeReader;
+
+ public LocalGroupByExecutor4CPV(
+ PartialPath path,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ Filter timeFilter,
+ TsFileFilter fileFilter,
+ boolean ascending)
+ throws StorageEngineException, QueryProcessException {
+
+ this.tsDataType = dataType;
+ this.mergeReader = new PriorityMergeReader();
+
+ // TODO: load all chunk metadatas into futureChunkList
+ // get all data sources
+ QueryDataSource queryDataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter);
+
+ // update filter by TTL
+ this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+ SeriesReader seriesReader = new SeriesReader(path, allSensors, dataType, context,
+ queryDataSource,
+ timeFilter, null, fileFilter, ascending);
+
+ // unpackAllOverlappedFilesToTimeSeriesMetadata
+ try {
+ futureChunkList.addAll(seriesReader.getAllChunkMetadatas4CPV());
+ } catch (IOException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void addAggregateResult(AggregateResult aggrResult) {
+ results.add(aggrResult);
+ }
+
+ /**
+ * @param curStartTime closed
+ * @param curEndTime open
+ * @param startTime closed
+ * @param endTime open
+ */
+ @Override
+ public List<AggregateResult> calcResult4CPV(long curStartTime, long curEndTime, long startTime,
+ long endTime, long interval) throws IOException, QueryProcessException {
+ System.out.println("====DEBUG====: calcResult for [" + curStartTime + "," + curEndTime + ")");
+
+ // clear result cache
+ for (AggregateResult result : results) {
+ result.reset();
+ }
+ // empty currentChunkList
+ currentChunkList = new ArrayList<>();
+
+ System.out.println("====DEBUG====: deal with futureChunkList");
+
+ ListIterator itr = futureChunkList.listIterator();
+ List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
+ while (itr.hasNext()) {
+ ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ long chunkMinTime = chunkMetadata.getStartTime();
+ long chunkMaxTime = chunkMetadata.getEndTime();
+ if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
+ // the chunk falls on the right side of the current M4 interval Ii
+ continue;
+ } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
+ // the chunk falls on the left side of the current M4 interval Ii
+ // or the chunk falls on the right side of the total query range
+ itr.remove();
+ } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) {
+ // the chunk falls completely within the current M4 interval Ii
+ currentChunkList.add(chunkSuit4CPV);
+ itr.remove();
+ } else {
+ // the chunk partially overlaps in time with the current M4 interval Ii.
+ // load this chunk, split it on deletes and all w intervals.
+ // add to currentChunkList and futureChunkList.
+ itr.remove();
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) {
+ // assume only one page in a chunk
+ // assume all data on disk, no data in memory
+ ((PageReader) pageReader)
+ .split4CPV(startTime, endTime, interval, curStartTime, currentChunkList,
+ tmpFutureChunkList, chunkMetadata);
+ }
+
+ System.out.println(
+ "====DEBUG====: load the chunk because overlaps the M4 interval. Version="
+ + chunkMetadata.getVersion() + " " + chunkMetadata.getOffsetOfChunkHeader());
+ }
+ }
+ futureChunkList.addAll(tmpFutureChunkList);
+ tmpFutureChunkList = null;
+ itr = null;
+
+ System.out.println("====DEBUG====: deal with currentChunkList");
+
+ if (currentChunkList.size() == 0) {
+ return results;
+ }
+
+ boolean[] isFinal = new boolean[4]; // default false
+ do {
+ long[] timestamps = new long[4]; // firstTime, lastTime, bottomTime, topTime
+ Object[] values = new Object[4]; // firstValue, lastValue, bottomValue, topValue
+ PriorityMergeReader.MergeReaderPriority[] versions = new PriorityMergeReader.MergeReaderPriority[4];
+ int[] listIdx = new int[4];
+ timestamps[0] = -1;
+ timestamps[1] = -1;
+ values[2] = null;
+ values[3] = null;
+
+ // find candidate points
+ System.out.println("====DEBUG====: find candidate points");
+
+ for (int j = 0; j < currentChunkList.size(); j++) {
+ ChunkMetadata chunkMetadata = currentChunkList.get(j).getChunkMetadata();
+ Statistics statistics = chunkMetadata.getStatistics();
+ MergeReaderPriority version = new MergeReaderPriority(chunkMetadata.getVersion(),
+ chunkMetadata.getOffsetOfChunkHeader());
+ // update firstPoint
+ if (!isFinal[0]) {
+ if (timestamps[0] == -1 ||
+ (statistics.getStartTime() < timestamps[0]) ||
+ (statistics.getStartTime() == timestamps[0] && version.compareTo(versions[0]) > 0)) {
+ timestamps[0] = statistics.getStartTime();
+ values[0] = statistics.getFirstValue();
+ versions[0] = version;
+ listIdx[0] = j;
+ }
+ }
+ // update lastPoint
+ if (!isFinal[1]) {
+ if (timestamps[1] == -1 ||
+ (statistics.getEndTime() > timestamps[1]) ||
+ (statistics.getEndTime() == timestamps[1] && version.compareTo(versions[1]) > 0)) {
+ timestamps[1] = statistics.getEndTime();
+ values[1] = statistics.getLastValue();
+ versions[1] = version;
+ listIdx[1] = j;
+ }
+ }
+ // update bottomPoint
+ if (!isFinal[2]) {
+ if (values[2] == null || (((Comparable) (values[2])).compareTo(statistics.getMinValue())
+ > 0)) {
+ timestamps[2] = statistics.getBottomTimestamp();
+ values[2] = statistics.getMinValue();
+ versions[2] = version;
+ listIdx[2] = j;
+ }
+ }
+ // update topPoint
+ if (!isFinal[3]) {
+ if (values[3] == null || (((Comparable) (values[3])).compareTo(statistics.getMaxValue())
+ < 0)) {
+ timestamps[3] = statistics.getTopTimestamp();
+ values[3] = statistics.getMaxValue();
+ versions[3] = version;
+ listIdx[3] = j;
+ }
+ }
+ }
+
+ System.out.println("====DEBUG====: verify candidate points");
+
+ // verify candidate points.
+ // firstPoint and lastPoint are valid for sure.
+ // default results sequence: min_time(%s), max_time(%s), first_value(%s), last_value(%s), min_value(%s), max_value(%s)
+ if (!isFinal[0]) { // firstPoint
+ long firstTimestamp = timestamps[0];
+ ChunkMetadata firstChunkMetadata = currentChunkList.get(listIdx[0]).getChunkMetadata();
+ // check if the point is deleted:
+ List<TimeRange> firstDeleteIntervalList = firstChunkMetadata.getDeleteIntervalList();
+ boolean isDeletedItself = false;
+ if (firstDeleteIntervalList != null) {
+ for (TimeRange timeRange : firstDeleteIntervalList) {
+ if (timeRange.contains(firstTimestamp)) {
+ isDeletedItself = true;
+ break;
+ }
+ }
+ }
+ if (isDeletedItself) {
+ System.out.println(
+ "====DEBUG====: load the chunk because candidate firstPoint is actually deleted. Version="
+ + firstChunkMetadata.getVersion() + " " + firstChunkMetadata
+ .getOffsetOfChunkHeader());
+
+ currentChunkList.remove(listIdx[0]);
+ List<IPageReader> pageReaderList = FileLoaderUtils
+ .loadPageReaderList(firstChunkMetadata, this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(startTime, endTime, interval, curStartTime, currentChunkList, null,
+ firstChunkMetadata);
+ }
+ continue; // next iteration to check currentChunkList
+ } else {
+ results.get(0).updateResultUsingValues(Arrays.copyOfRange(timestamps, 0, 1), 1,
+ Arrays.copyOfRange(values, 0, 1)); // min_time
+ results.get(2).updateResultUsingValues(Arrays.copyOfRange(timestamps, 0, 1), 1,
+ Arrays.copyOfRange(values, 0, 1)); // first_value
+ isFinal[0] = true;
+ System.out.println("====DEBUG====: find firstPoint");
+ }
+ }
+ if (!isFinal[1]) { // lastPoint
+ long lastTimestamp = timestamps[1];
+ ChunkMetadata lastChunkMetadata = currentChunkList.get(listIdx[1]).getChunkMetadata();
+ // check if the point is deleted:
+ List<TimeRange> lastDeleteIntervalList = lastChunkMetadata.getDeleteIntervalList();
+ boolean isDeletedItself = false;
+ if (lastDeleteIntervalList != null) {
+ for (TimeRange timeRange : lastDeleteIntervalList) {
+ if (timeRange.contains(lastTimestamp)) {
+ isDeletedItself = true;
+ break;
+ }
+ }
+ }
+ if (isDeletedItself) {
+ System.out.println(
+ "====DEBUG====: load the chunk because candidate lastPoint is actually deleted. Version="
+ + lastChunkMetadata.getVersion() + " " + lastChunkMetadata
+ .getOffsetOfChunkHeader());
+
+ currentChunkList.remove(listIdx[0]);
+ List<IPageReader> pageReaderList = FileLoaderUtils
+ .loadPageReaderList(lastChunkMetadata, this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(startTime, endTime, interval, curStartTime, currentChunkList, null,
+ lastChunkMetadata);
+ }
+ continue; // next iteration to check currentChunkList
+ } else {
+ results.get(1).updateResultUsingValues(Arrays.copyOfRange(timestamps, 1, 2), 1,
+ Arrays.copyOfRange(values, 1, 2)); // min_time
+ results.get(3).updateResultUsingValues(Arrays.copyOfRange(timestamps, 1, 2), 1,
+ Arrays.copyOfRange(values, 1, 2)); // first_value
+ isFinal[1] = true;
+
+ System.out.println("====DEBUG====: find lastPoint");
+ }
+ }
+ // verify bottomPoint:
+ if (!isFinal[2]) {
+ long bottomTimestamp = timestamps[2];
+ ChunkMetadata bottomChunkMetadata = currentChunkList.get(listIdx[2]).getChunkMetadata();
+ List<Long> mergedVersionList = currentChunkList.get(listIdx[2]).getMergeVersionList();
+ List<Long> mergedOffsetList = currentChunkList.get(listIdx[2]).getMergeOffsetList();
+ // check if the point is deleted:
+ List<TimeRange> bottomDeleteIntervalList = bottomChunkMetadata.getDeleteIntervalList();
+ boolean isDeletedItself = false;
+ if (bottomDeleteIntervalList != null) {
+ for (TimeRange timeRange : bottomDeleteIntervalList) {
+ if (timeRange.contains(bottomTimestamp)) {
+ isDeletedItself = true;
+ break;
+ }
+ }
+ }
+ if (isDeletedItself) {
+ System.out.println(
+ "====DEBUG====: load the chunk because candidate bottomPoint is actually deleted. Version="
+ + bottomChunkMetadata.getVersion() + " " + bottomChunkMetadata
+ .getOffsetOfChunkHeader());
+
+ currentChunkList.remove(listIdx[2]);
+ List<IPageReader> pageReaderList = FileLoaderUtils
+ .loadPageReaderList(bottomChunkMetadata, this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(startTime, endTime, interval, curStartTime, currentChunkList, null,
+ bottomChunkMetadata);
+ }
+ continue; // next iteration to check currentChunkList
+ } else { // verify if it is overlapped by other chunks with larger version number and not in the deleted time interval
+ List<Integer> toMerge = new ArrayList<>();
+ for (int i = 0; i < currentChunkList.size(); i++) {
+ ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata();
+ MergeReaderPriority version = new MergeReaderPriority(chunkMetadata.getVersion(),
+ chunkMetadata.getOffsetOfChunkHeader());
+ if (version.compareTo(versions[2]) <= 0) { // including bottomChunkMetadata
+ continue;
+ }
+ if (bottomTimestamp < chunkMetadata.getStartTime() || bottomTimestamp > chunkMetadata
+ .getEndTime()) {
+ continue;
+ }
+ boolean isMerged = false;
+ for (int k = 0; k < mergedVersionList.size(); k++) {
+ // these chunks are MARKED "merged" - not overlapped any more
+ if (mergedVersionList.get(i) == chunkMetadata.getVersion()
+ && mergedOffsetList.get(i) == chunkMetadata.getOffsetOfChunkHeader()) {
+ isMerged = true;
+ break;
+ }
+ }
+ if (isMerged) {
+ continue;
+ }
+ toMerge.add(i);
+ }
+ if (toMerge.isEmpty()) {
+ System.out.println("====DEBUG====: find bottomPoint");
+
+ results.get(4).updateResultUsingValues(Arrays.copyOfRange(timestamps, 2, 3), 1,
+ Arrays.copyOfRange(values, 2, 3)); // min_value
+ isFinal[2] = true;
+ } else {
+ // deal with toMerge chunks: delete updated points
+ toMerge.add(listIdx[2]);
+ List<Long> newMergedVersionList = new ArrayList<>();
+ List<Long> newMergedOffsetList = new ArrayList<>();
+ for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any more
+ ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata();
+ newMergedVersionList.add(tmpChunkMetadata.getVersion());
+ newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
+ }
+ Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>();
+ Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>();
+ for (int o = 0; o < toMerge.size(); o++) {
+ // create empty batchData
+ ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ MergeReaderPriority mergeReaderPriority = new MergeReaderPriority(
+ chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false);
+ updateBatchDataMap.put(mergeReaderPriority, batch1);
+ // create empty statistics
+ Statistics statistics = null;
+ switch (tsDataType) {
+ case INT32:
+ statistics = new IntegerStatistics();
+ break;
+ case INT64:
+ statistics = new LongStatistics();
+ break;
+ case FLOAT:
+ statistics = new FloatStatistics();
+ break;
+ case DOUBLE:
+ statistics = new DoubleStatistics();
+ break;
+ default:
+ break;
+ }
+ statisticsMap.put(mergeReaderPriority, statistics);
+ // prepare mergeReader
+ if (chunkSuit4CPV.getBatchData() == null) {
+ List<IPageReader> pageReaderList = FileLoaderUtils
+ .loadPageReaderList(chunkMetadata, this.timeFilter);
+ List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+ for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(startTime, endTime, interval, curStartTime, tmpCurrentChunkList,
+ null, chunkMetadata);
+ }
+ currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0));
+ chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+
+ System.out.println(
+ "====DEBUG====: load chunk for update merge. Version=" + chunkMetadata
+ .getVersion() + " " + chunkMetadata.getOffsetOfChunkHeader());
+ }
+ mergeReader.addReader(chunkSuit4CPV.getBatchData().getBatchDataIterator(),
+ new MergeReaderPriority(chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset()));
+ }
+ while (mergeReader.hasNextTimeValuePair()) {
+ Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement();
+ TimeValuePair ret = res.left;
+ System.out.println(
+ "====DEBUG====: merge for bottomPoint. (t,v)=" + ret.getTimestamp() + "," + ret
+ .getValue().getValue());
+ updateBatchDataMap.get(res.right)
+ .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
+ switch (tsDataType) {
+ case INT32:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (int) ret.getValue().getValue());
+ break;
+ case INT64:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (long) ret.getValue().getValue());
+ break;
+ case FLOAT:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (float) ret.getValue().getValue());
+ break;
+ case DOUBLE:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (double) ret.getValue().getValue());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(tsDataType));
+ }
+ }
+ mergeReader.close();
+
+ for (int o = 0; o < toMerge.size(); o++) {
+ ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ // to MARK these chunks are "merged" - not overlapped any more
+ chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
+ chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
+ // update BatchData
+ MergeReaderPriority mergeReaderPriority = new MergeReaderPriority(
+ chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset());
+ chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
+ chunkSuit4CPV.getChunkMetadata()
+ .setStatistics(statisticsMap.get(mergeReaderPriority));
+ }
+ System.out.println(
+ "====DEBUG====: merged chunks are : version=" + newMergedVersionList + " offsets="
+ + newMergedOffsetList);
+ continue;
+ }
+ }
+ }
+
+ // TODO: verify topPoint
+ if (!isFinal[3]) {
+ long topTimestamp = timestamps[3];
+ ChunkMetadata topChunkMetadata = currentChunkList.get(listIdx[3]).getChunkMetadata();
+ List<Long> mergedVersionList = currentChunkList.get(listIdx[3]).getMergeVersionList();
+ List<Long> mergedOffsetList = currentChunkList.get(listIdx[3]).getMergeOffsetList();
+ // check if the point is deleted:
+ List<TimeRange> topDeleteIntervalList = topChunkMetadata.getDeleteIntervalList();
+ boolean isDeletedItself = false;
+ if (topDeleteIntervalList != null) {
+ for (TimeRange timeRange : topDeleteIntervalList) {
+ if (timeRange.contains(topTimestamp)) {
+ isDeletedItself = true;
+ break;
+ }
+ }
+ }
+ if (isDeletedItself) {
+ System.out.println(
+ "====DEBUG====: load the chunk because candidate topPoint is actually deleted. Version="
+ + topChunkMetadata.getVersion() + " " + topChunkMetadata
+ .getOffsetOfChunkHeader());
+
+ currentChunkList.remove(listIdx[3]);
+ List<IPageReader> pageReaderList = FileLoaderUtils
+ .loadPageReaderList(topChunkMetadata, this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(startTime, endTime, interval, curStartTime, currentChunkList, null,
+ topChunkMetadata);
+ }
+ continue; // next iteration to check currentChunkList
+ } else { // verify if it is overlapped by other chunks with larger version number and not in the deleted time interval
+ List<Integer> toMerge = new ArrayList<>();
+ for (int i = 0; i < currentChunkList.size(); i++) {
+ ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata();
+ MergeReaderPriority version = new MergeReaderPriority(chunkMetadata.getVersion(),
+ chunkMetadata.getOffsetOfChunkHeader());
+ if (version.compareTo(versions[3]) <= 0) { // including topChunkMetadata
+ continue;
+ }
+ if (topTimestamp < chunkMetadata.getStartTime() || topTimestamp > chunkMetadata
+ .getEndTime()) {
+ continue;
+ }
+ boolean isMerged = false;
+ for (int k = 0; k < mergedVersionList.size(); k++) {
+ if (mergedVersionList.get(i) == chunkMetadata.getVersion()
+ && mergedOffsetList.get(i) == chunkMetadata.getOffsetOfChunkHeader()) {
+ isMerged = true;
+ break;
+ }
+ }
+ if (isMerged) {
+ continue;
+ }
+ toMerge.add(i);
+ }
+ if (toMerge.isEmpty()) {
+ results.get(5).updateResultUsingValues(Arrays.copyOfRange(timestamps, 3, 4), 1,
+ Arrays.copyOfRange(values, 3, 4)); // max_value
+ isFinal[3] = true;
+ System.out.println("====DEBUG====: find topPoint");
+ return results;
+ } else {
+ // deal with toMerge chunks: delete updated points
+ toMerge.add(listIdx[3]);
+ List<Long> newMergedVersionList = new ArrayList<>();
+ List<Long> newMergedOffsetList = new ArrayList<>();
+ for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any more
+ ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata();
+ newMergedVersionList.add(tmpChunkMetadata.getVersion());
+ newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
+ }
+ Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>();
+ Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>();
+ for (int o = 0; o < toMerge.size(); o++) {
+ // create empty batchData
+ ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ MergeReaderPriority mergeReaderPriority = new MergeReaderPriority(
+ chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false);
+ updateBatchDataMap.put(mergeReaderPriority, batch1);
+ // create empty statistics
+ Statistics statistics = null;
+ switch (tsDataType) {
+ case INT32:
+ statistics = new IntegerStatistics();
+ break;
+ case INT64:
+ statistics = new LongStatistics();
+ break;
+ case FLOAT:
+ statistics = new FloatStatistics();
+ break;
+ case DOUBLE:
+ statistics = new DoubleStatistics();
+ break;
+ default:
+ break;
+ }
+ statisticsMap.put(mergeReaderPriority, statistics);
+ // prepare mergeReader
+ if (chunkSuit4CPV.getBatchData() == null) {
+ List<IPageReader> pageReaderList = FileLoaderUtils
+ .loadPageReaderList(chunkMetadata, this.timeFilter);
+ List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+ for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(startTime, endTime, interval, curStartTime, tmpCurrentChunkList,
+ null, chunkMetadata);
+ }
+ currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0));
+ chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+
+ System.out.println(
+ "====DEBUG====: load chunk for update merge. Version=" + chunkMetadata
+ .getVersion() + " " + chunkMetadata.getOffsetOfChunkHeader());
+ }
+ mergeReader.addReader(chunkSuit4CPV.getBatchData().getBatchDataIterator(),
+ new MergeReaderPriority(chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset()));
+ }
+ while (mergeReader.hasNextTimeValuePair()) {
+ Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement();
+ TimeValuePair ret = res.left;
+ System.out.println(
+ "====DEBUG====: merge for topPoint. (t,v)=" + ret.getTimestamp() + "," + ret
+ .getValue().getValue());
+ updateBatchDataMap.get(res.right)
+ .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
+ switch (tsDataType) {
+ case INT32:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (int) ret.getValue().getValue());
+ break;
+ case INT64:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (long) ret.getValue().getValue());
+ break;
+ case FLOAT:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (float) ret.getValue().getValue());
+ break;
+ case DOUBLE:
+ statisticsMap.get(res.right)
+ .update(ret.getTimestamp(), (double) ret.getValue().getValue());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(tsDataType));
+ }
+ }
+ mergeReader.close();
+
+ for (int o = 0; o < toMerge.size(); o++) {
+ ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ // to MARK these chunks are "merged" - not overlapped any more
+ chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
+ chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
+ // update BatchData
+ MergeReaderPriority mergeReaderPriority = new MergeReaderPriority(
+ chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset());
+ chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
+ chunkSuit4CPV.getChunkMetadata()
+ .setStatistics(statisticsMap.get(mergeReaderPriority));
+ }
+ continue;
+ }
+ }
+ }
+ } while (true);
+ }
+
+
+ @Override
+ public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+ throws IOException {
+ throw new IOException("no implemented");
+ }
+
+ @Override
+ public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+ throws IOException, QueryProcessException {
+ throw new IOException("no implemented");
+ }
+
+ public List<ChunkSuit4CPV> getCurrentChunkList() {
+ return currentChunkList;
+ }
+
+ public List<ChunkSuit4CPV> getFutureChunkList() {
+ return futureChunkList;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index b76d64b..e16d94f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.series;
+import java.util.ArrayList;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -40,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
@@ -215,6 +217,65 @@ public class SeriesReader {
return !(hasNextPage() || hasNextChunk() || hasNextFile());
}
+ /**
+ * get TimeSeriesMetadata from queryResource, apply modifications on chunkMetadatas, i.e., assign
+ * deleteIntervals。 packed in ChunkSuit4CPV
+ */
+ public List<ChunkSuit4CPV> getAllChunkMetadatas4CPV() throws IOException {
+ List<ChunkSuit4CPV> chunkSuit4CPVList = new ArrayList<>();
+ while (orderUtils.hasNextUnseqResource()) {
+ TimeseriesMetadata timeseriesMetadata =
+ FileLoaderUtils.loadTimeSeriesMetadata(
+ orderUtils.getNextUnseqFileResource(true),
+ seriesPath,
+ context,
+ getAnyFilter(),
+ allSensors);
+ if (timeseriesMetadata != null) {
+ timeseriesMetadata.setModified(true);
+ timeseriesMetadata.setSeq(false);
+ }
+ unpackOneTimeSeriesMetadata4CPV(timeseriesMetadata, chunkSuit4CPVList);
+ }
+ while (orderUtils.hasNextSeqResource()) {
+ TimeseriesMetadata timeseriesMetadata =
+ FileLoaderUtils.loadTimeSeriesMetadata(
+ orderUtils.getNextSeqFileResource(true),
+ seriesPath,
+ context,
+ getAnyFilter(),
+ allSensors);
+ if (timeseriesMetadata != null) {
+ timeseriesMetadata.setSeq(true);
+ }
+ unpackOneTimeSeriesMetadata4CPV(timeseriesMetadata, chunkSuit4CPVList);
+ }
+ return chunkSuit4CPVList;
+ }
+
+ private void unpackOneTimeSeriesMetadata4CPV(TimeseriesMetadata timeSeriesMetadata,
+ List<ChunkSuit4CPV> chunkSuit4CPVList)
+ throws IOException {
+ List<ChunkMetadata> chunkMetadataList =
+ FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
+ chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
+
+ // try to calculate the total number of chunk and time-value points in chunk
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) {
+ long totalChunkPointsNum =
+ chunkMetadataList.stream()
+ .mapToLong(chunkMetadata -> chunkMetadata.getStatistics().getCount())
+ .sum();
+ TracingManager.getInstance()
+ .getTracingInfo(context.getQueryId())
+ .addChunkInfo(chunkMetadataList.size(), totalChunkPointsNum);
+ }
+
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ chunkSuit4CPVList.add(new ChunkSuit4CPV(chunkMetadata));
+ }
+ }
+
boolean hasNextFile() throws IOException {
QueryTimeManager.checkQueryAlive(context.getQueryId());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 5a61c23..d321074 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
+import org.apache.iotdb.tsfile.utils.Pair;
/** This class implements {@link IPointReader} for data sources with different priorities. */
@SuppressWarnings("ConstantConditions") // heap is ensured by hasNext non-empty
@@ -118,6 +119,33 @@ public class PriorityMergeReader implements IPointReader {
return ret;
}
+ public void addReader(IPointReader reader, MergeReaderPriority priority) throws IOException {
+ if (reader.hasNextTimeValuePair()) {
+ heap.add(
+ new Element(reader, reader.nextTimeValuePair(), priority));
+ } else {
+ reader.close();
+ }
+ }
+
+ public Pair<TimeValuePair, MergeReaderPriority> nextElement() throws IOException {
+ Element top = heap.poll();
+ TimeValuePair ret = top.getTimeValuePair();
+ Pair res = new Pair(ret, top.priority);
+ TimeValuePair topNext = null;
+ if (top.hasNext()) {
+ top.next();
+ topNext = top.currPair();
+ }
+ long topNextTime = topNext == null ? Long.MAX_VALUE : topNext.getTimestamp();
+ updateHeap(ret.getTimestamp(), topNextTime);
+ if (topNext != null) {
+ top.timeValuePair = topNext;
+ heap.add(top);
+ }
+ return res;
+ }
+
@Override
public TimeValuePair currentTimeValuePair() throws IOException {
return heap.peek().getTimeValuePair();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest1.java
new file mode 100644
index 0000000..97838d9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest1.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.m4;
+
+import java.sql.DriverManager;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyCPVTest1 {
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ private static String[] creationSqls =
+ new String[]{
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ };
+
+ private final String d0s0 = "root.vehicle.d0.s0";
+
+ private static final String insertTemplate =
+ "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ }
+
+ @Test
+ public void test1() {
+ prepareData1();
+
+ String[] res = new String[]{
+ "0,1,20,5,20,5[1],30[10]",
+ "25,25,45,8,30,8[25],40[30]",
+ "50,52,54,8,18,8[52],18[54]",
+ "75,null,null,null,null,null,null"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the sequence!!!
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData1() {
+ // data: https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 20));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 27, 20));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 40));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 35, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 20));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 33, 9));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 45, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 52, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 18));
+ statement.execute("FLUSH");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test2() { // add deletes
+ prepareData2();
+
+ String[] res = new String[]{
+ "0,1,20,5,20,5[1],30[10]",
+ "25,25,27,8,20,20[27],20[27]",
+ "50,null,null,null,null,null,null",
+ "75,null,null,null,null,null,null"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the sequence!!!
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData2() {
+ // data: https://user-images.githubusercontent.com/33376433/151995378-07a2f8df-5cac-499a-ae88-e3b017eee07a.png
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 20));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 27, 20));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 40));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 35, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 20));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 33, 9));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 45, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 52, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 18));
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=28 and time<=60");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test2_2() { // use data2 but change the sql from group by ([0,100),25ms) to group by ([0,150),25ms)
+ prepareData2();
+
+ String[] res = new String[]{
+ "0,1,20,5,20,5[1],30[10]",
+ "25,25,27,8,20,20[27],20[27]",
+ "50,null,null,null,null,null,null",
+ "75,null,null,null,null,null,null",
+ "100,120,120,8,8,8[120],8[120]",
+ "125,null,null,null,null,null,null"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,150),25ms)"); // don't change the sequence!!!
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void test3() { // all seq
+ prepareData3();
+
+ String[] res = new String[]{
+ "0,1,22,5,4,1[10],10[2]",
+ "25,30,40,8,2,2[40],8[30]",
+ "50,55,72,5,4,4[72],20[62]",
+ "75,80,90,11,1,1[90],11[80]"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the sequence!!!
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData3() {
+ // data: https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7));
+ statement.execute("FLUSH");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test3_2() { // not seq but no overlap
+ prepareData3_2();
+
+ String[] res = new String[]{
+ "0,1,22,5,4,1[10],10[2]",
+ "25,30,40,8,2,2[40],8[30]",
+ "50,55,72,5,4,4[72],20[62]",
+ "75,80,90,11,1,1[90],11[80]"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the sequence!!!
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData3_2() {
+ // data: https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18));
+ statement.execute("FLUSH");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test4() { // delete sequence move forward
+ prepareData4();
+
+ String[] res = new String[]{
+ "0,1,20,5,20,5[1],30[10]",
+ "25,25,45,8,30,8[25],30[45]",
+ "50,52,54,8,18,8[52],18[54]",
+ "75,null,null,null,null,null,null"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the sequence!!!
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData4() {
+ // data: https://user-images.githubusercontent.com/33376433/152006061-f1d95952-3f5c-4d88-b34e-45d3bb61b600.png
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 20));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 27, 20));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 40));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 35, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 20));
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=28 and time<=42");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 33, 9));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 45, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 52, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 18));
+ statement.execute("FLUSH");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest2.java
similarity index 51%
rename from server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java
rename to server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest2.java
index 79996df..aba4083 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyCPVTest2.java
@@ -18,32 +18,32 @@
*/
package org.apache.iotdb.db.integration.m4;
+import java.sql.DriverManager;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
-
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Locale;
import static org.junit.Assert.fail;
-public class MyTestM4 {
+public class MyCPVTest2 {
private static final String TIMESTAMP_STR = "Time";
private static String[] creationSqls =
- new String[] {
- "SET STORAGE GROUP TO root.vehicle.d0",
- "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ new String[]{
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
};
private final String d0s0 = "root.vehicle.d0.s0";
@@ -51,14 +51,17 @@ public class MyTestM4 {
private static final String insertTemplate =
"INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
+ private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+ private static int avgSeriesPointNumberThreshold;
+
@BeforeClass
public static void setUp() throws Exception {
IoTDBDescriptor.getInstance()
.getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ avgSeriesPointNumberThreshold = ioTDBConfig.getAvgSeriesPointNumberThreshold();
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
- prepareData1();
}
@AfterClass
@@ -67,80 +70,46 @@ public class MyTestM4 {
IoTDBDescriptor.getInstance()
.getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ ioTDBConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
}
- /**
- * MAC: merge all chunks. Use UDTF to mimic the process of merging all chunks to calculate
- * aggregation points.
- */
@Test
- public void testMAC() {
- try (Connection connection =
- DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- long tqs = 0L;
- long tqe = 100L;
- int w = 4;
- boolean hasResultSet =
- statement.execute(
- String.format(
- "select M4(s0,'tqs'='%1$d','tqe'='%2$d','w'='%3$d') from root.vehicle.d0 where "
- + "time>=%1$d and time<%2$d",
- tqs, tqe, w));
-
- Assert.assertTrue(hasResultSet);
- int cnt;
- try (ResultSet resultSet = statement.getResultSet()) {
- cnt = 0;
- while (resultSet.next()) {
- String ans =
- resultSet.getString(TIMESTAMP_STR)
- + ","
- + resultSet.getString(
- "M4(root.vehicle.d0.s0, \"tqs\"=\"0\", \"tqe\"=\"100\", \"w\"=\"4\")");
- System.out.println(ans);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
+ public void test1() {
+ prepareData1();
- /** MOC: merge overlapping chunks. This is what IoTDB does. */
- @Test
- public void testMOC() {
+ String[] res = new String[]{
+ "0,1,20,5,20,5[1],30[10]",
+ "25,27,45,20,30,9[33],40[30]",
+ "50,52,54,8,18,8[52],18[54]",
+ "75,null,null,null,null,null,null"};
try (Connection connection =
- DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
boolean hasResultSet =
statement.execute(
- "SELECT min_time(s0),first_value(s0),"
- + "max_time(s0), last_value(s0),"
- + "max_value(s0), min_value(s0)"
- + " FROM root.vehicle.d0 group by ([0,100),25ms)");
+ "SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the sequence!!!
Assert.assertTrue(hasResultSet);
- int cnt;
try (ResultSet resultSet = statement.getResultSet()) {
- cnt = 0;
+ int i = 0;
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
+ ","
+ resultSet.getString(String.format("min_time(%s)", d0s0))
+ ","
- + resultSet.getString(String.format("first_value(%s)", d0s0))
- + ","
+ resultSet.getString(String.format("max_time(%s)", d0s0))
+ ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ resultSet.getString(String.format("last_value(%s)", d0s0))
+ ","
- + resultSet.getString(String.format("max_value(%s)", d0s0))
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ ","
- + resultSet.getString(String.format("min_value(%s)", d0s0));
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
}
}
} catch (Exception e) {
@@ -149,56 +118,41 @@ public class MyTestM4 {
}
}
- /**
- * The data written is shown in figure:
- * https://user-images.githubusercontent.com/33376433/151664843-6afcb40d-fe6e-4ea8-bdd6-efe467f40c1c.png
- */
private static void prepareData1() {
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ // data:
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
for (String sql : creationSqls) {
statement.execute(sql);
}
- // seq1
- for (int i = 0; i <= 99; i++) {
- statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
- }
- statement.executeBatch();
- statement.clearBatch();
- statement.execute("FLUSH");
+ ioTDBConfig.setSeqTsFileSize(1024 * 1024 * 1024);// 1G
+ ioTDBConfig.setUnSeqTsFileSize(1024 * 1024 * 1024); // 1G
+ ioTDBConfig.setAvgSeriesPointNumberThreshold(4); // this step cannot be omitted
- // unseq2
- for (int i = 10; i <= 28; i++) {
- statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
- }
- statement.executeBatch();
- statement.clearBatch();
- statement.execute("FLUSH");
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8));
- // unseq3
- for (int i = 29; i <= 36; i++) {
- statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
- }
- statement.executeBatch();
- statement.clearBatch();
- statement.execute("FLUSH");
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 20));
- // unseq4
- for (int i = 37; i <= 60; i++) {
- statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
- }
- statement.executeBatch();
- statement.clearBatch();
- statement.execute("FLUSH");
-
- statement.execute("delete from root.vehicle.d0.s0 where time>=26 and time<=27");
- statement.execute("delete from root.vehicle.d0.s0 where time>=35 and time<=40");
- statement.execute("delete from root.vehicle.d0.s0 where time>=48 and time<=75");
- statement.execute("delete from root.vehicle.d0.s0 where time>=85");
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 27, 20));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 40));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 35, 10));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 20));
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 33, 9));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 45, 30));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 52, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 18));
+
+ statement.execute("flush");
} catch (Exception e) {
e.printStackTrace();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 5a0bca2..039ca9f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -131,6 +131,10 @@ public class ChunkMetadata {
return tsDataType;
}
+ public void setStatistics(Statistics statistics) {
+ this.statistics = statistics;
+ }
+
/**
* serialize to outputStream.
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
new file mode 100644
index 0000000..1a983a9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+
+public class ChunkSuit4CPV {
+
+ private ChunkMetadata chunkMetadata; //this.version = new MergeReaderPriority(chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ private BatchData batchData;
+ private List<Long> mergeVersionList = new ArrayList<>();
+ private List<Long> mergeOffsetList = new ArrayList<>();
+
+ public ChunkSuit4CPV(ChunkMetadata chunkMetadata) {
+ this.chunkMetadata = chunkMetadata;
+ this.batchData = null;
+ }
+
+ public ChunkSuit4CPV(ChunkMetadata chunkMetadata, BatchData batchData) {
+ this.chunkMetadata = chunkMetadata;
+ this.batchData = batchData;
+ }
+
+ public ChunkMetadata getChunkMetadata() {
+ return chunkMetadata;
+ }
+
+ public BatchData getBatchData() {
+ return batchData;
+ }
+
+ public void setBatchData(BatchData batchData) {
+ this.batchData = batchData;
+ }
+
+ public void setChunkMetadata(ChunkMetadata chunkMetadata) {
+ this.chunkMetadata = chunkMetadata;
+ }
+
+ public void addMergeVersionList(long version) {
+ this.mergeVersionList.add(version);
+ }
+
+ public void addMergeOffsetList(long offset) {
+ this.mergeOffsetList.add(offset);
+ }
+
+ public List<Long> getMergeVersionList() {
+ return mergeVersionList;
+ }
+
+ public List<Long> getMergeOffsetList() {
+ return mergeOffsetList;
+ }
+
+ public long getVersion() {
+ return this.getChunkMetadata().getVersion();
+ }
+
+ public long getOffset() {
+ return this.getChunkMetadata().getOffsetOfChunkHeader();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 4c8fe12..2d3e168 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -18,13 +18,21 @@
*/
package org.apache.iotdb.tsfile.read.reader.page;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
@@ -100,6 +108,113 @@ public class PageReader implements IPageReader {
valueBuffer.position(timeBufferLength);
}
+ public void split4CPV(long startTime, long endTime, long interval,
+ long curStartTime, List<ChunkSuit4CPV> currentChunkList,
+ List<ChunkSuit4CPV> futureChunkList, ChunkMetadata chunkMetadata)
+ throws IOException { // note: [startTime,endTime), [curStartTime,curEndTime]
+ Map<Integer, BatchData> splitBatchDataMap = new HashMap<>();
+ Map<Integer, ChunkMetadata> splitChunkMetadataMap = new HashMap<>();
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
+ // prepare corresponding batchData
+ if (timestamp < curStartTime) {
+ continue;
+ }
+ if (timestamp >= endTime) {
+ break;
+ }
+ int idx = (int) Math.floor((timestamp - startTime) * 1.0 / interval);
+ if (!splitBatchDataMap.containsKey(idx)) {
+ // create batchData
+ BatchData batch1 = BatchDataFactory.createBatchData(dataType, true, false);
+ splitBatchDataMap.put(idx, batch1);
+ Statistics statistics = null;
+ switch (dataType) {
+ case INT32:
+ statistics = new IntegerStatistics();
+ break;
+ case INT64:
+ statistics = new LongStatistics();
+ break;
+ case FLOAT:
+ statistics = new FloatStatistics();
+ break;
+ case DOUBLE:
+ statistics = new DoubleStatistics();
+ break;
+ default:
+ break;
+ }
+ // create chunkMetaData
+ ChunkMetadata chunkMetadata1 = new ChunkMetadata(chunkMetadata.getMeasurementUid(),
+ chunkMetadata.getDataType(), chunkMetadata.getOffsetOfChunkHeader(), statistics);
+ chunkMetadata1.setVersion(chunkMetadata.getVersion()); // don't miss this
+
+ // important, used later for candidate point verification
+ // (1) candidate point itself whether is in the deleted interval
+ // (2) candidate point whether is overlapped by a chunk with a larger version number and
+ // the chunk does not have a deleted interval overlapping this candidate point
+ chunkMetadata1.setDeleteIntervalList(chunkMetadata.getDeleteIntervalList());
+ // not use current Ii to modify deletedIntervalList any more
+
+ splitChunkMetadataMap.put(idx, chunkMetadata1);
+ }
+ BatchData batchData1 = splitBatchDataMap.get(idx);
+ ChunkMetadata chunkMetadata1 = splitChunkMetadataMap.get(idx);
+ switch (dataType) {
+ case INT32:
+ int anInt = valueDecoder.readInt(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
+ // update batchData1
+ batchData1.putInt(timestamp, anInt);
+ // update statistics of chunkMetadata1
+ chunkMetadata1.getStatistics().update(timestamp, anInt); //TODO skeptical bug ?
+ }
+ break;
+ case INT64:
+ long aLong = valueDecoder.readLong(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
+ // update batchData1
+ batchData1.putLong(timestamp, aLong);
+ // update statistics of chunkMetadata1
+ chunkMetadata1.getStatistics().update(timestamp, aLong); //TODO skeptical bug ?
+ }
+ break;
+ case FLOAT:
+ float aFloat = valueDecoder.readFloat(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
+ // update batchData1
+ batchData1.putFloat(timestamp, aFloat);
+ // update statistics of chunkMetadata1
+ chunkMetadata1.getStatistics().update(timestamp, aFloat); //TODO skeptical bug ?
+ }
+ break;
+ case DOUBLE:
+ double aDouble = valueDecoder.readDouble(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
+ // update batchData1
+ batchData1.putDouble(timestamp, aDouble);
+ // update statistics of chunkMetadata1
+ chunkMetadata1.getStatistics().update(timestamp, aDouble); //TODO skeptical bug ?
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+ int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval);
+ int num = (int) Math.floor((endTime - startTime) * 1.0 / interval);
+ for (int i = 0; i < num; i++) {
+ if (splitBatchDataMap.containsKey(i) && i == curIdx) {
+ currentChunkList
+ .add(new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip()));
+ } else if (splitBatchDataMap.containsKey(i) && i != curIdx) {
+ futureChunkList
+ .add(new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip()));
+ }
+ }
+ }
+
/** @return the returned BatchData may be empty, but never be null */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override