You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/12/28 09:42:10 UTC
[iotdb] branch master updated: [IOTDB-1942] Support align by device query in new vector (#4435)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3c1d485 [IOTDB-1942] Support align by device query in new vector (#4435)
3c1d485 is described below
commit 3c1d4856cecba7ff75a65c53339b70eeab058228
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Dec 28 17:41:36 2021 +0800
[IOTDB-1942] Support align by device query in new vector (#4435)
---
.../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 5 +-
.../iotdb/db/integration/IoTDBSimpleQueryIT.java | 6 +-
.../IoTDBAggregationWithoutValueFilterIT.java | 18 +-
.../integration/aligned/IoTDBAlignByDevice2IT.java | 68 +
.../integration/aligned/IoTDBAlignByDeviceIT.java | 1349 ++++++++++++++++++++
.../qp/logical/crud/GroupByFillQueryOperator.java | 2 +-
.../iotdb/db/qp/logical/crud/QueryOperator.java | 151 ++-
.../db/qp/physical/crud/AlignByDevicePlan.java | 139 +-
.../iotdb/db/qp/physical/crud/MeasurementInfo.java | 37 +-
.../iotdb/db/qp/physical/crud/QueryPlan.java | 11 +-
.../db/qp/physical/crud/RawDataQueryPlan.java | 3 +-
.../db/query/dataset/AlignByDeviceDataSet.java | 121 +-
.../org/apache/iotdb/db/utils/SchemaUtils.java | 11 +-
.../session/IoTDBSessionVectorABDeviceIT.java | 27 +-
14 files changed, 1668 insertions(+), 280 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index dc9f52c..2b1509a 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -319,11 +319,8 @@ public class IoTDBAlignByDeviceIT {
"103,root.vehicle.d0,199,null,",
"104,root.vehicle.d0,190,null,",
"105,root.vehicle.d0,199,11.11,",
- "106,root.vehicle.d0,null,null,",
"1000,root.vehicle.d0,55555,1000.11,",
"946684800000,root.vehicle.d0,100,null,",
- "1,root.vehicle.d1,null,null,",
- "1000,root.vehicle.d1,null,null,",
};
try (Connection connection = EnvFactory.getEnv().getConnection();
@@ -357,7 +354,7 @@ public class IoTDBAlignByDeviceIT {
Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
cnt++;
}
- Assert.assertEquals(16, cnt);
+ Assert.assertEquals(13, cnt);
}
} catch (Exception e) {
e.printStackTrace();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 8d35ee5..6750588 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -587,14 +587,14 @@ public class IoTDBSimpleQueryIT {
resultSet = statement.executeQuery("select * from root.** align by device");
// has time and device columns
- Assert.assertEquals(2, resultSet.getMetaData().getColumnCount());
+ Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
while (resultSet.next()) {
fail();
}
resultSet = statement.executeQuery("select count(*) from root align by device");
// has device column
- Assert.assertEquals(1, resultSet.getMetaData().getColumnCount());
+ Assert.assertEquals(2, resultSet.getMetaData().getColumnCount());
while (resultSet.next()) {
fail();
}
@@ -604,7 +604,7 @@ public class IoTDBSimpleQueryIT {
"select count(*) from root where time >= 1 and time <= 100 "
+ "group by ([0, 100), 20ms, 20ms) align by device");
// has time and device columns
- Assert.assertEquals(2, resultSet.getMetaData().getColumnCount());
+ Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
while (resultSet.next()) {
fail();
}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java
index 5f37945..5d3ff74 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java
@@ -23,14 +23,24 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
import org.apache.iotdb.jdbc.Config;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
@Category({LocalStandaloneTest.class})
public class IoTDBAggregationWithoutValueFilterIT {
@@ -236,7 +246,7 @@ public class IoTDBAggregationWithoutValueFilterIT {
boolean hasResultSet =
statement.execute(
- "select count(s1),count (s2),count (s3),sum(s1),sum(s2),sum(s3),avg(s1),avg(s2),avg(s3) from root.sg1.d1");
+ "select count(s1),count(s2),count(s3),sum(s1),sum(s2),sum(s3),avg(s1),avg(s2),avg(s3) from root.sg1.d1");
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDevice2IT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDevice2IT.java
new file mode 100644
index 0000000..6338bb5
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDevice2IT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/** Let One chunk has more than one page. */
+@Category({LocalStandaloneTest.class})
+public class IoTDBAlignByDevice2IT extends IoTDBAlignByDeviceIT {
+
+ private static int numOfPointsPerPage;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ // TODO When the aligned time series support compaction, we need to set compaction to true
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+ AlignedWriteUtil.insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+ EnvironmentUtils.cleanEnv();
+ }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDeviceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDeviceIT.java
new file mode 100644
index 0000000..6f429e4
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDeviceIT.java
@@ -0,0 +1,1349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.constant.TestConstant.avg;
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.firstValue;
+import static org.apache.iotdb.db.constant.TestConstant.lastValue;
+import static org.apache.iotdb.db.constant.TestConstant.maxTime;
+import static org.apache.iotdb.db.constant.TestConstant.maxValue;
+import static org.apache.iotdb.db.constant.TestConstant.minTime;
+import static org.apache.iotdb.db.constant.TestConstant.minValue;
+import static org.apache.iotdb.db.constant.TestConstant.sum;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test is for aligned time series exactly, which is different from
+ * integration/IoTDBAlignByDeviceIT.
+ */
+@Category({LocalStandaloneTest.class})
+public class IoTDBAlignByDeviceIT {
+
+ private static final double DELTA = 1e-6;
+ private static final String TIMESTAMP_STR = "Time";
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ // TODO When the aligned time series support compaction, we need to set compaction to true
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ AlignedWriteUtil.insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void selectAllAlignedWithoutValueFilterTest() throws ClassNotFoundException {
+
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1.0,1,1,true,aligned_test1",
+ "2,root.sg1.d1,2.0,2,2,null,aligned_test2",
+ "3,root.sg1.d1,30000.0,null,30000,true,aligned_unseq_test3",
+ "4,root.sg1.d1,4.0,4,null,true,aligned_test4",
+ "5,root.sg1.d1,5.0,5,null,true,aligned_test5",
+ "6,root.sg1.d1,6.0,6,6,true,null",
+ "7,root.sg1.d1,7.0,7,7,false,aligned_test7",
+ "8,root.sg1.d1,8.0,8,8,null,aligned_test8",
+ "9,root.sg1.d1,9.0,9,9,false,aligned_test9",
+ "10,root.sg1.d1,null,10,10,true,aligned_test10",
+ "11,root.sg1.d1,11.0,11,11,null,null",
+ "12,root.sg1.d1,12.0,12,12,null,null",
+ "13,root.sg1.d1,130000.0,130000,130000,true,aligned_unseq_test13",
+ "14,root.sg1.d1,14.0,14,14,null,null",
+ "15,root.sg1.d1,15.0,15,15,null,null",
+ "16,root.sg1.d1,16.0,16,16,null,null",
+ "17,root.sg1.d1,17.0,17,17,null,null",
+ "18,root.sg1.d1,18.0,18,18,null,null",
+ "19,root.sg1.d1,19.0,19,19,null,null",
+ "20,root.sg1.d1,20.0,20,20,null,null",
+ "21,root.sg1.d1,null,null,21,true,null",
+ "22,root.sg1.d1,null,null,22,true,null",
+ "23,root.sg1.d1,230000.0,null,230000,false,null",
+ "24,root.sg1.d1,null,null,24,true,null",
+ "25,root.sg1.d1,null,null,25,true,null",
+ "26,root.sg1.d1,null,null,26,false,null",
+ "27,root.sg1.d1,null,null,27,false,null",
+ "28,root.sg1.d1,null,null,28,false,null",
+ "29,root.sg1.d1,null,null,29,false,null",
+ "30,root.sg1.d1,null,null,30,false,null",
+ "31,root.sg1.d1,null,31,null,null,aligned_test31",
+ "32,root.sg1.d1,null,32,null,null,aligned_test32",
+ "33,root.sg1.d1,null,33,null,null,aligned_test33",
+ "34,root.sg1.d1,null,34,null,null,aligned_test34",
+ "35,root.sg1.d1,null,35,null,null,aligned_test35",
+ "36,root.sg1.d1,null,36,null,null,aligned_test36",
+ "37,root.sg1.d1,null,37,null,null,aligned_test37",
+ "38,root.sg1.d1,null,38,null,null,aligned_test38",
+ "39,root.sg1.d1,null,39,null,null,aligned_test39",
+ "40,root.sg1.d1,null,40,null,null,aligned_test40",
+ };
+
+ String[] columnNames = {"Device", "s1", "s2", "s3", "s4", "s5"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select * from root.sg1.d1 align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectAllAlignedAndNonAlignedTest() throws ClassNotFoundException {
+
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1.0,1,1,true,aligned_test1",
+ "2,root.sg1.d1,2.0,2,2,null,aligned_test2",
+ "3,root.sg1.d1,30000.0,null,30000,true,aligned_unseq_test3",
+ "4,root.sg1.d1,4.0,4,null,true,aligned_test4",
+ "5,root.sg1.d1,5.0,5,null,true,aligned_test5",
+ "6,root.sg1.d1,6.0,6,6,true,null",
+ "7,root.sg1.d1,7.0,7,7,false,aligned_test7",
+ "8,root.sg1.d1,8.0,8,8,null,aligned_test8",
+ "9,root.sg1.d1,9.0,9,9,false,aligned_test9",
+ "10,root.sg1.d1,null,10,10,true,aligned_test10",
+ "11,root.sg1.d1,11.0,11,11,null,null",
+ "12,root.sg1.d1,12.0,12,12,null,null",
+ "13,root.sg1.d1,130000.0,130000,130000,true,aligned_unseq_test13",
+ "14,root.sg1.d1,14.0,14,14,null,null",
+ "15,root.sg1.d1,15.0,15,15,null,null",
+ "16,root.sg1.d1,16.0,16,16,null,null",
+ "17,root.sg1.d1,17.0,17,17,null,null",
+ "18,root.sg1.d1,18.0,18,18,null,null",
+ "19,root.sg1.d1,19.0,19,19,null,null",
+ "20,root.sg1.d1,20.0,20,20,null,null",
+ "21,root.sg1.d1,null,null,21,true,null",
+ "22,root.sg1.d1,null,null,22,true,null",
+ "23,root.sg1.d1,230000.0,null,230000,false,null",
+ "24,root.sg1.d1,null,null,24,true,null",
+ "25,root.sg1.d1,null,null,25,true,null",
+ "26,root.sg1.d1,null,null,26,false,null",
+ "27,root.sg1.d1,null,null,27,false,null",
+ "28,root.sg1.d1,null,null,28,false,null",
+ "29,root.sg1.d1,null,null,29,false,null",
+ "30,root.sg1.d1,null,null,30,false,null",
+ "31,root.sg1.d1,null,31,null,null,aligned_test31",
+ "32,root.sg1.d1,null,32,null,null,aligned_test32",
+ "33,root.sg1.d1,null,33,null,null,aligned_test33",
+ "34,root.sg1.d1,null,34,null,null,aligned_test34",
+ "35,root.sg1.d1,null,35,null,null,aligned_test35",
+ "36,root.sg1.d1,null,36,null,null,aligned_test36",
+ "37,root.sg1.d1,null,37,null,null,aligned_test37",
+ "38,root.sg1.d1,null,38,null,null,aligned_test38",
+ "39,root.sg1.d1,null,39,null,null,aligned_test39",
+ "40,root.sg1.d1,null,40,null,null,aligned_test40",
+ "1,root.sg1.d2,1.0,1,1,true,non_aligned_test1",
+ "2,root.sg1.d2,2.0,2,2,null,non_aligned_test2",
+ "3,root.sg1.d2,3.0,null,3,false,non_aligned_test3",
+ "4,root.sg1.d2,4.0,4,null,true,non_aligned_test4",
+ "5,root.sg1.d2,5.0,5,null,true,non_aligned_test5",
+ "6,root.sg1.d2,6.0,6,6,true,null",
+ "7,root.sg1.d2,7.0,7,7,false,non_aligned_test7",
+ "8,root.sg1.d2,8.0,8,8,null,non_aligned_test8",
+ "9,root.sg1.d2,9.0,9,9,false,non_aligned_test9",
+ "10,root.sg1.d2,null,10,10,true,non_aligned_test10",
+ "11,root.sg1.d2,11.0,11,11,null,null",
+ "12,root.sg1.d2,12.0,12,12,null,null",
+ "13,root.sg1.d2,13.0,13,13,null,null",
+ "14,root.sg1.d2,14.0,14,14,null,null",
+ "15,root.sg1.d2,15.0,15,15,null,null",
+ "16,root.sg1.d2,16.0,16,16,null,null",
+ "17,root.sg1.d2,17.0,17,17,null,null",
+ "18,root.sg1.d2,18.0,18,18,null,null",
+ "19,root.sg1.d2,19.0,19,19,null,null",
+ "20,root.sg1.d2,20.0,20,20,null,null",
+ "21,root.sg1.d2,null,null,21,true,null",
+ "22,root.sg1.d2,null,null,22,true,null",
+ "23,root.sg1.d2,null,null,23,true,null",
+ "24,root.sg1.d2,null,null,24,true,null",
+ "25,root.sg1.d2,null,null,25,true,null",
+ "26,root.sg1.d2,null,null,26,false,null",
+ "27,root.sg1.d2,null,null,27,false,null",
+ "28,root.sg1.d2,null,null,28,false,null",
+ "29,root.sg1.d2,null,null,29,false,null",
+ "30,root.sg1.d2,null,null,30,false,null",
+ "31,root.sg1.d2,null,31,null,null,non_aligned_test31",
+ "32,root.sg1.d2,null,32,null,null,non_aligned_test32",
+ "33,root.sg1.d2,null,33,null,null,non_aligned_test33",
+ "34,root.sg1.d2,null,34,null,null,non_aligned_test34",
+ "35,root.sg1.d2,null,35,null,null,non_aligned_test35",
+ "36,root.sg1.d2,null,36,null,null,non_aligned_test36",
+ "37,root.sg1.d2,null,37,null,null,non_aligned_test37",
+ "38,root.sg1.d2,null,38,null,null,non_aligned_test38",
+ "39,root.sg1.d2,null,39,null,null,non_aligned_test39",
+ "40,root.sg1.d2,null,40,null,null,non_aligned_test40",
+ };
+
+ String[] columnNames = {"Device", "s1", "s2", "s3", "s4", "s5"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select * from root.sg1.* align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectAllAlignedWithTimeFilterTest() throws ClassNotFoundException {
+
+ String[] retArray =
+ new String[] {
+ "9,root.sg1.d1,9.0,9,9,false,aligned_test9",
+ "10,root.sg1.d1,null,10,10,true,aligned_test10",
+ "11,root.sg1.d1,11.0,11,11,null,null",
+ "12,root.sg1.d1,12.0,12,12,null,null",
+ "13,root.sg1.d1,130000.0,130000,130000,true,aligned_unseq_test13",
+ "14,root.sg1.d1,14.0,14,14,null,null",
+ "15,root.sg1.d1,15.0,15,15,null,null",
+ "16,root.sg1.d1,16.0,16,16,null,null",
+ "17,root.sg1.d1,17.0,17,17,null,null",
+ "18,root.sg1.d1,18.0,18,18,null,null",
+ "19,root.sg1.d1,19.0,19,19,null,null",
+ "20,root.sg1.d1,20.0,20,20,null,null",
+ "21,root.sg1.d1,null,null,21,true,null",
+ "22,root.sg1.d1,null,null,22,true,null",
+ "23,root.sg1.d1,230000.0,null,230000,false,null",
+ "24,root.sg1.d1,null,null,24,true,null",
+ "25,root.sg1.d1,null,null,25,true,null",
+ "26,root.sg1.d1,null,null,26,false,null",
+ "27,root.sg1.d1,null,null,27,false,null",
+ "28,root.sg1.d1,null,null,28,false,null",
+ "29,root.sg1.d1,null,null,29,false,null",
+ "30,root.sg1.d1,null,null,30,false,null",
+ "31,root.sg1.d1,null,31,null,null,aligned_test31",
+ "32,root.sg1.d1,null,32,null,null,aligned_test32",
+ "33,root.sg1.d1,null,33,null,null,aligned_test33",
+ };
+
+ String[] columnNames = {"Device", "s1", "s2", "s3", "s4", "s5"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select * from root.sg1.d1 where time >= 9 and time <= 33 align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectSomeAlignedWithoutValueFilterTest1() throws ClassNotFoundException {
+
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1.0,true,aligned_test1",
+ "2,root.sg1.d1,2.0,null,aligned_test2",
+ "3,root.sg1.d1,30000.0,true,aligned_unseq_test3",
+ "4,root.sg1.d1,4.0,true,aligned_test4",
+ "5,root.sg1.d1,5.0,true,aligned_test5",
+ "6,root.sg1.d1,6.0,true,null",
+ "7,root.sg1.d1,7.0,false,aligned_test7",
+ "8,root.sg1.d1,8.0,null,aligned_test8",
+ "9,root.sg1.d1,9.0,false,aligned_test9",
+ "10,root.sg1.d1,null,true,aligned_test10",
+ "11,root.sg1.d1,11.0,null,null",
+ "12,root.sg1.d1,12.0,null,null",
+ "13,root.sg1.d1,130000.0,true,aligned_unseq_test13",
+ "14,root.sg1.d1,14.0,null,null",
+ "15,root.sg1.d1,15.0,null,null",
+ "16,root.sg1.d1,16.0,null,null",
+ "17,root.sg1.d1,17.0,null,null",
+ "18,root.sg1.d1,18.0,null,null",
+ "19,root.sg1.d1,19.0,null,null",
+ "20,root.sg1.d1,20.0,null,null",
+ "21,root.sg1.d1,null,true,null",
+ "22,root.sg1.d1,null,true,null",
+ "23,root.sg1.d1,230000.0,false,null",
+ "24,root.sg1.d1,null,true,null",
+ "25,root.sg1.d1,null,true,null",
+ "26,root.sg1.d1,null,false,null",
+ "27,root.sg1.d1,null,false,null",
+ "28,root.sg1.d1,null,false,null",
+ "29,root.sg1.d1,null,false,null",
+ "30,root.sg1.d1,null,false,null",
+ "31,root.sg1.d1,null,null,aligned_test31",
+ "32,root.sg1.d1,null,null,aligned_test32",
+ "33,root.sg1.d1,null,null,aligned_test33",
+ "34,root.sg1.d1,null,null,aligned_test34",
+ "35,root.sg1.d1,null,null,aligned_test35",
+ "36,root.sg1.d1,null,null,aligned_test36",
+ "37,root.sg1.d1,null,null,aligned_test37",
+ "38,root.sg1.d1,null,null,aligned_test38",
+ "39,root.sg1.d1,null,null,aligned_test39",
+ "40,root.sg1.d1,null,null,aligned_test40",
+ };
+
+ String[] columnNames = {"Device", "s1", "s4", "s5"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select s1,s4,s5 from root.sg1.d1 align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectSomeAlignedWithoutValueFilterTest2() throws ClassNotFoundException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1.0,true",
+ "2,root.sg1.d1,2.0,null",
+ "3,root.sg1.d1,30000.0,true",
+ "4,root.sg1.d1,4.0,true",
+ "5,root.sg1.d1,5.0,true",
+ "6,root.sg1.d1,6.0,true",
+ "7,root.sg1.d1,7.0,false",
+ "8,root.sg1.d1,8.0,null",
+ "9,root.sg1.d1,9.0,false",
+ "10,root.sg1.d1,null,true",
+ "11,root.sg1.d1,11.0,null",
+ "12,root.sg1.d1,12.0,null",
+ "13,root.sg1.d1,130000.0,true",
+ "14,root.sg1.d1,14.0,null",
+ "15,root.sg1.d1,15.0,null",
+ "16,root.sg1.d1,16.0,null",
+ "17,root.sg1.d1,17.0,null",
+ "18,root.sg1.d1,18.0,null",
+ "19,root.sg1.d1,19.0,null",
+ "20,root.sg1.d1,20.0,null",
+ "21,root.sg1.d1,null,true",
+ "22,root.sg1.d1,null,true",
+ "23,root.sg1.d1,230000.0,false",
+ "24,root.sg1.d1,null,true",
+ "25,root.sg1.d1,null,true",
+ "26,root.sg1.d1,null,false",
+ "27,root.sg1.d1,null,false",
+ "28,root.sg1.d1,null,false",
+ "29,root.sg1.d1,null,false",
+ "30,root.sg1.d1,null,false",
+ };
+
+ String[] columnNames = {"Device", "s1", "s4"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select s1,s4 from root.sg1.d1 align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectSomeAlignedWithTimeFilterTest() throws ClassNotFoundException {
+ String[] retArray =
+ new String[] {
+ "16,root.sg1.d1,16.0,null,null",
+ "17,root.sg1.d1,17.0,null,null",
+ "18,root.sg1.d1,18.0,null,null",
+ "19,root.sg1.d1,19.0,null,null",
+ "20,root.sg1.d1,20.0,null,null",
+ "21,root.sg1.d1,null,true,null",
+ "22,root.sg1.d1,null,true,null",
+ "23,root.sg1.d1,230000.0,false,null",
+ "24,root.sg1.d1,null,true,null",
+ "25,root.sg1.d1,null,true,null",
+ "26,root.sg1.d1,null,false,null",
+ "27,root.sg1.d1,null,false,null",
+ "28,root.sg1.d1,null,false,null",
+ "29,root.sg1.d1,null,false,null",
+ "30,root.sg1.d1,null,false,null",
+ "31,root.sg1.d1,null,null,aligned_test31",
+ "32,root.sg1.d1,null,null,aligned_test32",
+ "33,root.sg1.d1,null,null,aligned_test33",
+ "34,root.sg1.d1,null,null,aligned_test34",
+ };
+
+ String[] columnNames = {"Device", "s1", "s4", "s5"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select s1,s4,s5 from root.sg1.d1 where time >= 16 and time <= 34 align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countAllAlignedWithoutTimeFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"root.sg1.d1", "20", "29", "28", "19", "20"};
+ String[] columnNames = {
+ "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select count(*) from root.sg1.d1 align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countAllAlignedAndNonAlignedWithoutTimeFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"root.sg1.d1,20,29,28,19,20,", "root.sg1.d2,19,29,28,18,19,"};
+ String[] columnNames = {
+ "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select count(*) from root.sg1.* align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(resultSet.getString(index)).append(",");
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countAllAlignedWithTimeFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"root.sg1.d1", "12", "15", "22", "13", "6"};
+ String[] columnNames = {
+ "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*) from root.sg1.d1 where time >= 9 and time <= 33 align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /** aggregate multi columns of aligned timeseries in one SQL */
+ @Test
+ public void aggregateSomeAlignedWithoutTimeFilterTest() throws ClassNotFoundException {
+ double[] retArray =
+ new double[] {
+ 20, 29, 28, 390184, 130549, 390417, 19509.2, 4501.689655172413, 13943.464285714286
+ };
+ String[] columnNames = {
+ "Device",
+ "count(s1)",
+ "count(s2)",
+ "count(s3)",
+ "sum(s1)",
+ "sum(s2)",
+ "sum(s3)",
+ "avg(s1)",
+ "avg(s2)",
+ "avg(s3)",
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1),count(s2),count(s3),sum(s1),sum(s2),sum(s3),avg(s1),avg(s2),avg(s3) from root.sg1.d1 align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ double[] ans = new double[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 1; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i - 1] = Double.parseDouble(resultSet.getString(index));
+ assertEquals(retArray[i - 1], ans[i - 1], DELTA);
+ }
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"root.sg1.d1", "11"};
+ String[] columnNames = {"Device", "count(s4)"};
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute("select count(s4) from root.sg1.d1 where s4 = true align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void aggregationFuncAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray =
+ new String[] {"root.sg1.d1", "8", "42.0", "5.25", "1.0", "9.0", "1", "9", "1.0", "9.0"};
+ String[] columnNames = {
+ "Device",
+ "count(s1)",
+ "sum(s1)",
+ "avg(s1)",
+ "first_value(s1)",
+ "last_value(s1)",
+ "min_time(s1)",
+ "max_time(s1)",
+ "min_value(s1)",
+ "max_value(s1)",
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s1), avg(s1), "
+ + "first_value(s1), last_value(s1), "
+ + "min_time(s1), max_time(s1),"
+ + "max_value(s1), min_value(s1) from root.sg1.d1 where s1 < 10 "
+ + "align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"root.sg1.d1", "6", "6", "9", "11", "6"};
+ String[] columnNames = {
+ "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*) from root.sg1.d1 where s4 = true " + "align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void aggregationAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"root.sg1.d1", "160016.0", "11", "1", "13"};
+ String[] columnNames = {
+ "Device", "sum(s1)", "count(s4)", "min_value(s3)", "max_time(s2)",
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select sum(s1), count(s4), min_value(s3), max_time(s2) from root.sg1.d1 where s4 = true "
+ + "align by device");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countSumAvgGroupByTimeTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,4,40.0,7.5",
+ "11,root.sg1.d1,10,130142.0,13014.2",
+ "21,root.sg1.d1,1,null,230000.0",
+ "31,root.sg1.d1,0,355.0,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + "where time > 5 GROUP BY ([1, 41), 10ms) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(count("s1"))
+ + ","
+ + resultSet.getString(sum("s2"))
+ + ","
+ + resultSet.getString(avg("s1"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueGroupByTimeTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,10,6.0,10,6",
+ "11,root.sg1.d1,130000,11.0,20,11",
+ "21,root.sg1.d1,230000,230000.0,null,21",
+ "31,root.sg1.d1,null,null,40,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3) from root.sg1.d1 "
+ + "where time > 5 GROUP BY ([1, 41), 10ms) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(maxValue("s3"))
+ + ","
+ + resultSet.getString(minValue("s1"))
+ + ","
+ + resultSet.getString(maxTime("s2"))
+ + ","
+ + resultSet.getString(minTime("s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastGroupByTimeTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,null,null", "6,root.sg1.d1,true,aligned_test7",
+ "11,root.sg1.d1,true,aligned_unseq_test13", "16,root.sg1.d1,null,null",
+ "21,root.sg1.d1,true,null", "26,root.sg1.d1,false,null",
+ "31,root.sg1.d1,null,aligned_test31", "36,root.sg1.d1,null,aligned_test36"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(s4), first_value(s5) from root.sg1.d1 "
+ + "where time > 5 and time < 38 GROUP BY ([1, 41), 5ms) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(lastValue("s4"))
+ + ","
+ + resultSet.getString(firstValue("s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithWildcardTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,9,9,8,8,9,9.0,10,10,true,aligned_test10",
+ "11,root.sg1.d1,10,10,10,1,1,20.0,20,20,true,aligned_unseq_test13",
+ "21,root.sg1.d1,1,0,10,10,0,230000.0,null,30,false,null",
+ "31,root.sg1.d1,0,10,0,0,10,null,40,null,null,aligned_test40"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*), last_value(*) from root.sg1.d1 GROUP BY ([1, 41), 10ms) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(count("s1"))
+ + ","
+ + resultSet.getString(count("s2"))
+ + ","
+ + resultSet.getString(count("s3"))
+ + ","
+ + resultSet.getString(count("s4"))
+ + ","
+ + resultSet.getString(count("s5"))
+ + ","
+ + resultSet.getString(lastValue("s1"))
+ + ","
+ + resultSet.getString(lastValue("s2"))
+ + ","
+ + resultSet.getString(lastValue("s3"))
+ + ","
+ + resultSet.getString(lastValue("s4"))
+ + ","
+ + resultSet.getString(lastValue("s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithNonAlignedTimeseriesTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,0,null,null",
+ "7,root.sg1.d1,3,34.0,8.0",
+ "13,root.sg1.d1,4,130045.0,32511.25",
+ "19,root.sg1.d1,2,39.0,19.5",
+ "25,root.sg1.d1,0,null,null",
+ "31,root.sg1.d1,0,130.0,null",
+ "37,root.sg1.d1,0,154.0,null",
+ "1,root.sg1.d2,0,null,null",
+ "7,root.sg1.d2,3,34.0,8.0",
+ "13,root.sg1.d2,4,58.0,14.5",
+ "19,root.sg1.d2,2,39.0,19.5",
+ "25,root.sg1.d2,0,null,null",
+ "31,root.sg1.d2,0,130.0,null",
+ "37,root.sg1.d2,0,154.0,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.* "
+ + "where time > 5 GROUP BY ([1, 41), 4ms, 6ms) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(count("s1"))
+ + ","
+ + resultSet.getString(sum("s2"))
+ + ","
+ + resultSet.getString(avg("s1"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void countSumAvgPreviousFillTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,4,40.0,7.5",
+ "11,root.sg1.d1,10,130142.0,13014.2",
+ "21,root.sg1.d1,1,130142.0,230000.0",
+ "31,root.sg1.d1,0,355.0,230000.0"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + "where time > 5 GROUP BY ([1, 41), 10ms) FILL (previous, 15ms) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(count("s1"))
+ + ","
+ + resultSet.getString(sum("s2"))
+ + ","
+ + resultSet.getString(avg("s1"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void countSumAvgValueFillTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1,3.14,30000.0",
+ "6,root.sg1.d1,4,40.0,7.5",
+ "11,root.sg1.d1,5,130052.0,26010.4",
+ "16,root.sg1.d1,5,90.0,18.0",
+ "21,root.sg1.d1,1,3.14,230000.0",
+ "26,root.sg1.d1,0,3.14,3.14",
+ "31,root.sg1.d1,0,3.14,3.14"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 36), 5ms) FILL (3.14) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(count("s1"))
+ + ","
+ + resultSet.getString(sum("s2"))
+ + ","
+ + resultSet.getString(avg("s1"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimePreviousUntilLastFillTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,30000,6.0,9,3",
+ "11,root.sg1.d1,130000,11.0,20,11",
+ "21,root.sg1.d1,230000,230000.0,null,23",
+ "31,root.sg1.d1,null,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3) from root.sg1.d1 "
+ + "where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms) FILL(previousUntilLast) align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(maxValue("s3"))
+ + ","
+ + resultSet.getString(minValue("s1"))
+ + ","
+ + resultSet.getString(maxTime("s2"))
+ + ","
+ + resultSet.getString(minTime("s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeValueFillTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,30000,30000.0,null,3",
+ "6,root.sg1.d1,10,6.0,10,6",
+ "11,root.sg1.d1,130000,11.0,15,11",
+ "16,root.sg1.d1,20,16.0,20,16",
+ "21,root.sg1.d1,230000,230000.0,null,21",
+ "26,root.sg1.d1,29,null,null,26"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3) from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 31), 5ms) FILL ('fill string') align by device");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("Device")
+ + ","
+ + resultSet.getString(maxValue("s3"))
+ + ","
+ + resultSet.getString(minValue("s1"))
+ + ","
+ + resultSet.getString(maxTime("s2"))
+ + ","
+ + resultSet.getString(minTime("s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
index 485432d..f6f71c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
@@ -41,7 +41,7 @@ public class GroupByFillQueryOperator extends GroupByQueryOperator {
protected AlignByDevicePlan generateAlignByDevicePlan(PhysicalGenerator generator)
throws QueryProcessException {
AlignByDevicePlan alignByDevicePlan = super.generateAlignByDevicePlan(generator);
- alignByDevicePlan.setGroupByTimePlan(initGroupByTimeFillPlan(new GroupByTimeFillPlan()));
+ alignByDevicePlan.setGroupByFillPlan(initGroupByTimeFillPlan(new GroupByTimeFillPlan()));
return alignByDevicePlan;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 134d57d..4653ed6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -23,14 +23,12 @@ import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.index.common.IndexType;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -41,7 +39,6 @@ import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -54,7 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.iotdb.db.utils.SchemaUtils.getAggregationType;
+import static org.apache.iotdb.db.utils.SchemaUtils.getSeriesTypeByPath;
public class QueryOperator extends Operator {
@@ -173,8 +170,17 @@ public class QueryOperator extends Operator {
}
public void check() throws LogicalOperatorException {
- if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) {
- throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries.");
+ if (isAlignByDevice()) {
+ if (selectComponent.hasTimeSeriesGeneratingFunction()) {
+ throw new LogicalOperatorException(
+ "ALIGN BY DEVICE clause is not supported in UDF queries.");
+ }
+
+ for (PartialPath path : selectComponent.getPaths()) {
+ if (path.getNodes().length > 1) {
+ throw new LogicalOperatorException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
+ }
+ }
}
}
@@ -235,105 +241,99 @@ public class QueryOperator extends Operator {
throws QueryProcessException {
AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
- List<PartialPath> prefixPaths = fromComponent.getPrefixPaths();
// remove stars in fromPaths and get deviceId with deduplication
- List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
- List<ResultColumn> resultColumns = selectComponent.getResultColumns();
+ List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths());
+ List<ResultColumn> resultColumns =
+ convertSpecialClauseValues(alignByDevicePlan, selectComponent.getResultColumns());
List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
// to record result measurement columns
List<String> measurements = new ArrayList<>();
Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
List<PartialPath> paths = new ArrayList<>();
+ List<String> aggregations = new ArrayList<>();
- for (int i = 0; i < resultColumns.size(); i++) { // per suffix in SELECT
+ // per suffix in SELECT
+ for (int i = 0; i < resultColumns.size(); i++) {
ResultColumn resultColumn = resultColumns.get(i);
- Expression suffixExpression = resultColumn.getExpression();
- PartialPath suffixPath = getSuffixPathFromExpression(suffixExpression);
+ PartialPath suffixPath = getSuffixPathFromExpression(resultColumn.getExpression());
String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null;
-
// to record measurements in the loop of a suffix path
Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+ // concat suffix with per device
for (PartialPath device : devices) {
PartialPath fullPath = device.concatPath(suffixPath);
try {
// remove stars in SELECT to get actual paths
List<MeasurementPath> actualPaths = getMatchedTimeseries(fullPath);
- if (suffixPath.getNodes().length > 1) {
- throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
- }
if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
throw new QueryProcessException(
String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias()));
}
- if (actualPaths.isEmpty()) {
- String nonExistMeasurement = getMeasurementName(fullPath, aggregation);
- if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
- measurementInfoMap.putIfAbsent(
- nonExistMeasurement, new MeasurementInfo(MeasurementType.NonExist));
+ for (MeasurementPath path : actualPaths) {
+ MeasurementInfo measurementInfo =
+ new MeasurementInfo(getMeasurementName(path, aggregation));
+ TSDataType columnDataType = getSeriesTypeByPath(path, aggregation);
+ if (aggregation != null) {
+ aggregations.add(aggregation);
}
- } else {
- for (PartialPath path : actualPaths) {
- String measurementName = getMeasurementName(path, aggregation);
- TSDataType measurementDataType = path.getSeriesType();
- TSDataType columnDataType = getAggregationType(aggregation);
- columnDataType = columnDataType == null ? measurementDataType : columnDataType;
- MeasurementInfo measurementInfo =
- measurementInfoMap.getOrDefault(measurementName, new MeasurementInfo());
-
- if (resultColumn.hasAlias()) {
- measurementInfo.setMeasurementAlias(resultColumn.getAlias());
- }
-
- // check datatype consistency
- // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
- // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
- if (measurementInfo.getColumnDataType() != null) {
- if (!columnDataType.equals(measurementInfo.getColumnDataType())) {
- throw new QueryProcessException(
- "The data types of the same measurement column should be the same across devices.");
- }
- } else {
- measurementInfo.setColumnDataType(columnDataType);
- measurementInfo.setMeasurementDataType(measurementDataType);
- }
-
- measurementSetOfGivenSuffix.add(measurementName);
- measurementInfo.setMeasurementType(MeasurementType.Exist);
- measurementInfoMap.put(measurementName, measurementInfo);
- // update paths
- paths.add(path);
+ checkDataTypeConsistency(
+ columnDataType, measurementInfoMap.get(measurementInfo.getMeasurement()));
+
+ if (!measurementInfoMap.containsKey(measurementInfo.getMeasurement())) {
+ measurementInfo.setMeasurementAlias(
+ resultColumn.hasAlias() ? resultColumn.getAlias() : null);
+ measurementInfo.setColumnDataType(columnDataType);
+ measurementInfoMap.put(measurementInfo.getMeasurement(), measurementInfo);
}
+ measurementSetOfGivenSuffix.add(measurementInfo.getMeasurement());
+ paths.add(path);
}
} catch (MetadataException | QueryProcessException e) {
throw new QueryProcessException(e.getMessage());
}
}
- // Note that in the loop of a suffix path, set is used.
- // And across the loops of suffix paths, list is used.
- // e.g. select *,s1 from root.sg.d0, root.sg.d1
- // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
- // for suffix s1, measurementSetOfGivenSuffix = {s1}
- // therefore the final measurements is [s1,s2,s3,s1].
- measurements.addAll(measurementSetOfGivenSuffix);
+ if (measurementSetOfGivenSuffix.isEmpty()) {
+ measurements.add(suffixPath.toString());
+ } else {
+ // Note that in the loop of a suffix path, set is used.
+ // And across the loops of suffix paths, list is used.
+ // e.g. select *,s1 from root.sg.d0, root.sg.d1
+ // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
+ // for suffix s1, measurementSetOfGivenSuffix = {s1}
+ // therefore the final measurements is [s1,s2,s3,s1].
+ measurements.addAll(measurementSetOfGivenSuffix);
+ }
}
- List<String> trimMeasurements = convertSpecialClauseValues(alignByDevicePlan, measurements);
// assigns to alignByDevicePlan
- alignByDevicePlan.setMeasurements(trimMeasurements);
- alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
- alignByDevicePlan.setDevices(devices);
+ alignByDevicePlan.setMeasurements(measurements);
alignByDevicePlan.setPaths(paths);
+ alignByDevicePlan.setAggregations(aggregations);
+ alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
alignByDevicePlan.setEnableTracing(enableTracing);
+ alignByDevicePlan.deduplicate(generator);
+
if (whereComponent != null) {
alignByDevicePlan.setDeviceToFilterMap(
- concatFilterByDevice(devices, whereComponent.getFilterOperator()));
+ concatFilterByDevice(alignByDevicePlan, devices, whereComponent.getFilterOperator()));
}
return alignByDevicePlan;
}
+ private void checkDataTypeConsistency(TSDataType checkedDataType, MeasurementInfo measurementInfo)
+ throws QueryProcessException {
+ // check datatype consistency
+ // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
+ // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+ if (measurementInfo != null && !checkedDataType.equals(measurementInfo.getColumnDataType())) {
+ throw new QueryProcessException(AlignByDevicePlan.DATATYPE_ERROR_MESSAGE);
+ }
+ }
+
protected void convertSpecialClauseValues(QueryPlan queryPlan) {
if (specialClauseComponent != null) {
queryPlan.setWithoutAllNull(specialClauseComponent.isWithoutAllNull());
@@ -345,16 +345,16 @@ public class QueryOperator extends Operator {
}
}
- private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements)
- throws QueryProcessException {
+ private List<ResultColumn> convertSpecialClauseValues(
+ QueryPlan queryPlan, List<ResultColumn> resultColumns) throws QueryProcessException {
convertSpecialClauseValues(queryPlan);
// sLimit trim on the measurementColumnList
if (specialClauseComponent.hasSlimit()) {
int seriesSLimit = specialClauseComponent.getSeriesLimit();
int seriesOffset = specialClauseComponent.getSeriesOffset();
- return slimitTrimColumn(measurements, seriesSLimit, seriesOffset);
+ return slimitTrimColumn(resultColumns, seriesSLimit, seriesOffset);
}
- return measurements;
+ return resultColumns;
}
private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths)
@@ -385,19 +385,16 @@ public class QueryOperator extends Operator {
*/
private String getMeasurementName(PartialPath path, String aggregation) {
String initialMeasurement = path.getMeasurement();
- if (path instanceof AlignedPath) {
- String subMeasurement = ((AlignedPath) path).getMeasurement(0);
- initialMeasurement += TsFileConstant.PATH_SEPARATOR + subMeasurement;
- }
if (aggregation != null) {
initialMeasurement = aggregation + "(" + initialMeasurement + ")";
}
return initialMeasurement;
}
- private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset)
+ private List<ResultColumn> slimitTrimColumn(
+ List<ResultColumn> resultColumns, int seriesLimit, int seriesOffset)
throws QueryProcessException {
- int size = columnList.size();
+ int size = resultColumns.size();
// check parameter range
if (seriesOffset >= size) {
@@ -411,14 +408,15 @@ public class QueryOperator extends Operator {
}
// trim seriesPath list
- return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
+ return new ArrayList<>(resultColumns.subList(seriesOffset, endPosition));
}
// e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
// [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
// root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
private Map<String, IExpression> concatFilterByDevice(
- List<PartialPath> devices, FilterOperator operator) throws QueryProcessException {
+ AlignByDevicePlan alignByDevicePlan, List<PartialPath> devices, FilterOperator operator)
+ throws QueryProcessException {
Map<String, IExpression> deviceToFilterMap = new HashMap<>();
Set<PartialPath> filterPaths = new HashSet<>();
Iterator<PartialPath> deviceIterator = devices.iterator();
@@ -430,6 +428,7 @@ public class QueryOperator extends Operator {
concatFilterPath(device, newOperator, filterPaths);
} catch (LogicalOptimizeException | MetadataException e) {
deviceIterator.remove();
+ alignByDevicePlan.removeDevice(device.getFullPath());
continue;
}
// transform to a list so it can be indexed
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 37b3a1f..7132cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -41,17 +42,21 @@ public class AlignByDevicePlan extends QueryPlan {
"The paths of the SELECT clause can only be measurements or STAR.";
public static final String ALIAS_ERROR_MESSAGE =
"alias %s can only be matched with one time series";
+ public static final String DATATYPE_ERROR_MESSAGE =
+ "The data types of the same measurement column should be the same across devices.";
// to record result measurement columns, e.g. temperature, status, speed
private List<String> measurements;
- private List<TSDataType> dataTypes;
private Map<String, MeasurementInfo> measurementInfoMap;
+ private List<PartialPath> deduplicatePaths = new ArrayList<>();
+ private List<String> aggregations;
- // to check data type consistency for the same name sensor of different devices
- private List<PartialPath> devices;
+ // paths index of each device that need to execute
+ private Map<String, List<Integer>> deviceToPathIndex = new LinkedHashMap<>();
private Map<String, IExpression> deviceToFilterMap;
private GroupByTimePlan groupByTimePlan;
+ private GroupByTimeFillPlan groupByFillPlan;
private FillQueryPlan fillQueryPlan;
private AggregationPlan aggregationPlan;
@@ -61,7 +66,41 @@ public class AlignByDevicePlan extends QueryPlan {
@Override
public void deduplicate(PhysicalGenerator physicalGenerator) {
- // do nothing
+ Set<String> pathWithAggregationSet = new LinkedHashSet<>();
+ List<String> deduplicatedAggregations = new ArrayList<>();
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = paths.get(i);
+ String aggregation = aggregations != null ? aggregations.get(i) : null;
+ String pathStrWithAggregation = getPathStrWithAggregation(path, aggregation);
+ if (!pathWithAggregationSet.contains(pathStrWithAggregation)) {
+ pathWithAggregationSet.add(pathStrWithAggregation);
+ deduplicatePaths.add(path);
+ if (this.aggregations != null) {
+ deduplicatedAggregations.add(this.aggregations.get(i));
+ }
+ deviceToPathIndex
+ .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+ .add(deduplicatePaths.size() - 1);
+ }
+ }
+ setAggregations(deduplicatedAggregations);
+ this.paths = null;
+ }
+
+ public List<PartialPath> getDeduplicatePaths() {
+ return deduplicatePaths;
+ }
+
+ public void removeDevice(String device) {
+ deviceToPathIndex.remove(device);
+ }
+
+ public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
+ this.measurementInfoMap = measurementInfoMap;
+ }
+
+ public Map<String, MeasurementInfo> getMeasurementInfoMap() {
+ return measurementInfoMap;
}
@Override
@@ -69,51 +108,32 @@ public class AlignByDevicePlan extends QueryPlan {
TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
List<String> respColumns = new ArrayList<>();
- List<String> columnsTypes = new ArrayList<>();
+ List<String> columnTypes = new ArrayList<>();
// the DEVICE column of ALIGN_BY_DEVICE result
respColumns.add(SQLConstant.ALIGNBY_DEVICE_COLUMN_NAME);
- columnsTypes.add(TSDataType.TEXT.toString());
-
- // get column types and do deduplication
- List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
- deduplicatedColumnsType.add(TSDataType.TEXT);
+ columnTypes.add(TSDataType.TEXT.toString());
Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
- Map<String, MeasurementInfo> measurementInfoMap = getMeasurementInfoMap();
-
// build column header with constant and non exist column and deduplication
- List<String> measurements = getMeasurements();
for (String measurement : measurements) {
MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
TSDataType type = TSDataType.TEXT;
- switch (measurementInfo.getMeasurementType()) {
- case Exist:
- type = measurementInfo.getColumnDataType();
- break;
- case NonExist:
- case Constant:
- type = TSDataType.TEXT;
+ String measurementName = measurement;
+ if (measurementInfo != null) {
+ type = measurementInfo.getColumnDataType();
+ measurementName = measurementInfo.getMeasurementAlias();
}
- String measurementAlias = measurementInfo.getMeasurementAlias();
- respColumns.add(measurementAlias != null ? measurementAlias : measurement);
- columnsTypes.add(type.toString());
+ respColumns.add(measurementName != null ? measurementName : measurement);
+ columnTypes.add(type.toString());
- if (!deduplicatedMeasurements.contains(measurement)) {
- deduplicatedMeasurements.add(measurement);
- deduplicatedColumnsType.add(type);
- }
+ deduplicatedMeasurements.add(measurement);
}
- // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use.
- // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
- setMeasurements(new ArrayList<>(deduplicatedMeasurements));
- setDataTypes(deduplicatedColumnsType);
-
- // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing.
- setPaths(null);
+ // save deduplicated measurements in AlignByDevicePlan for AlignByDeviceDataSet to use.
+ measurements = new ArrayList<>(deduplicatedMeasurements);
resp.setColumns(respColumns);
- resp.setDataTypeList(columnsTypes);
+ resp.setDataTypeList(columnTypes);
if (getOperatorType() == OperatorType.AGGREGATION) {
resp.setIgnoreTimeStamp(true);
}
@@ -129,20 +149,20 @@ public class AlignByDevicePlan extends QueryPlan {
}
@Override
- public List<TSDataType> getDataTypes() {
- return dataTypes;
+ public List<String> getAggregations() {
+ return aggregations;
}
- public void setDataTypes(List<TSDataType> dataTypes) {
- this.dataTypes = dataTypes;
+ public void setAggregations(List<String> aggregations) {
+ this.aggregations = aggregations.isEmpty() ? null : aggregations;
}
- public void setDevices(List<PartialPath> devices) {
- this.devices = devices;
+ public Map<String, List<Integer>> getDeviceToPathIndex() {
+ return deviceToPathIndex;
}
- public List<PartialPath> getDevices() {
- return devices;
+ public void setDeviceToPathIndex(Map<String, List<Integer>> deviceToPathIndex) {
+ this.deviceToPathIndex = deviceToPathIndex;
}
public Map<String, IExpression> getDeviceToFilterMap() {
@@ -162,6 +182,15 @@ public class AlignByDevicePlan extends QueryPlan {
this.setOperatorType(OperatorType.GROUP_BY_TIME);
}
+ public GroupByTimeFillPlan getGroupByFillPlan() {
+ return groupByFillPlan;
+ }
+
+ public void setGroupByFillPlan(GroupByTimeFillPlan groupByFillPlan) {
+ this.groupByFillPlan = groupByFillPlan;
+ this.setOperatorType(OperatorType.GROUP_BY_FILL);
+ }
+
public FillQueryPlan getFillQueryPlan() {
return fillQueryPlan;
}
@@ -180,23 +209,11 @@ public class AlignByDevicePlan extends QueryPlan {
this.setOperatorType(Operator.OperatorType.AGGREGATION);
}
- public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
- this.measurementInfoMap = measurementInfoMap;
- }
-
- public Map<String, MeasurementInfo> getMeasurementInfoMap() {
- return measurementInfoMap;
- }
-
- /**
- * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
- * that do not exist in any device, data type is considered as String. The value is considered as
- * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is
- * considered as String and the value is the measurement name.
- */
- public enum MeasurementType {
- Exist,
- NonExist,
- Constant
+ private String getPathStrWithAggregation(PartialPath path, String aggregation) {
+ String initialPath = path.getFullPath();
+ if (aggregation != null) {
+ initialPath = aggregation + "(" + initialPath + ")";
+ }
+ return initialPath;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
index 7c6dd17..c91493c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
@@ -18,51 +18,38 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class MeasurementInfo {
public MeasurementInfo() {}
- public MeasurementInfo(MeasurementType measurementType) {
- this.measurementType = measurementType;
+ public MeasurementInfo(String measurement) {
+ this.measurement = measurement;
}
+ private String measurement;
+
// select s1, s2 as speed from root, then s2 -> speed
private String measurementAlias;
- // to record different kinds of measurement
- private MeasurementType measurementType;
-
- // to record the real type of the measurement, used for actual query
- private TSDataType measurementDataType;
-
// to record the datatype of the column in the result set
private TSDataType columnDataType;
- public void setMeasurementAlias(String measurementAlias) {
- this.measurementAlias = measurementAlias;
+ public void setMeasurement(String measurement) {
+ this.measurement = measurement;
}
- public String getMeasurementAlias() {
- return measurementAlias;
+ public String getMeasurement() {
+ return measurement;
}
- public void setMeasurementType(MeasurementType measurementType) {
- this.measurementType = measurementType;
- }
-
- public MeasurementType getMeasurementType() {
- return measurementType;
- }
-
- public void setMeasurementDataType(TSDataType measurementDataType) {
- this.measurementDataType = measurementDataType;
+ public void setMeasurementAlias(String measurementAlias) {
+ this.measurementAlias = measurementAlias;
}
- public TSDataType getMeasurementDataType() {
- return measurementDataType;
+ public String getMeasurementAlias() {
+ return measurementAlias;
}
public void setColumnDataType(TSDataType columnDataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 10afc95..d8dd931 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -126,14 +126,11 @@ public abstract class QueryPlan extends PhysicalPlan {
@Override
public void setPaths(List<PartialPath> paths) {
- if (paths == null) this.paths = null; // align by device
- else {
- List<MeasurementPath> measurementPaths = new ArrayList<>();
- for (PartialPath path : paths) {
- measurementPaths.add((MeasurementPath) path);
- }
- this.paths = measurementPaths;
+ List<MeasurementPath> measurementPaths = new ArrayList<>();
+ for (PartialPath path : paths) {
+ measurementPaths.add((MeasurementPath) path);
}
+ this.paths = measurementPaths;
}
public List<TSDataType> getDataTypes() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 863a586..0f63271 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -128,9 +128,10 @@ public class RawDataQueryPlan extends QueryPlan {
path -> {
Set<String> set =
deviceToMeasurements.computeIfAbsent(path.getDevice(), key -> new HashSet<>());
- set.add(path.getMeasurement());
if (path instanceof AlignedPath) {
set.addAll(((AlignedPath) path).getMeasurementList());
+ } else {
+ set.add(path.getMeasurement());
}
});
this.deduplicatedPaths = deduplicatedPaths;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 620bfae..e138bb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -19,20 +19,17 @@
package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.IQueryRouter;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -48,8 +45,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
/** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */
public class AlignByDeviceDataSet extends QueryDataSet {
@@ -60,39 +55,48 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private IExpression expression;
private List<String> measurements;
- private List<PartialPath> devices;
+ private List<PartialPath> paths;
+ private List<String> aggregations;
+ private Map<String, List<Integer>> deviceToPathIndex;
private Map<String, IExpression> deviceToFilterMap;
- private Map<String, MeasurementInfo> measurementInfoMap;
private GroupByTimePlan groupByTimePlan;
+ private GroupByTimeFillPlan groupByFillPlan;
private FillQueryPlan fillQueryPlan;
private AggregationPlan aggregationPlan;
private RawDataQueryPlan rawDataQueryPlan;
private boolean curDataSetInitialized;
- private PartialPath currentDevice;
private QueryDataSet currentDataSet;
- private Iterator<PartialPath> deviceIterator;
+ private Iterator<String> deviceIterator;
+ private String currentDevice;
private List<String> executeColumns;
private int pathsNum = 0;
public AlignByDeviceDataSet(
AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) {
- super(null, alignByDevicePlan.getDataTypes());
+ super(null, null);
// align by device's column number is different from other datasets
// TODO I don't know whether it's right or not in AlignedPath, remember to check here while
// adapting AlignByDevice query for new vector
- super.columnNum = alignByDevicePlan.getDataTypes().size();
+ super.columnNum = alignByDevicePlan.getMeasurements().size() + 1; // + 1 for 'device'
this.measurements = alignByDevicePlan.getMeasurements();
- this.devices = alignByDevicePlan.getDevices();
- this.measurementInfoMap = alignByDevicePlan.getMeasurementInfoMap();
+ this.paths = alignByDevicePlan.getDeduplicatePaths();
+ this.aggregations = alignByDevicePlan.getAggregations();
this.queryRouter = queryRouter;
this.context = context;
+ this.deviceIterator = alignByDevicePlan.getDeviceToPathIndex().keySet().iterator();
+ this.deviceToPathIndex = alignByDevicePlan.getDeviceToPathIndex();
this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
switch (alignByDevicePlan.getOperatorType()) {
+ case GROUP_BY_FILL:
+ this.dataSetType = DataSetType.GROUP_BY_FILL;
+ this.groupByFillPlan = alignByDevicePlan.getGroupByFillPlan();
+ this.groupByFillPlan.setAscending(alignByDevicePlan.isAscending());
+ break;
case GROUP_BY_TIME:
- this.dataSetType = DataSetType.GROUPBYTIME;
+ this.dataSetType = DataSetType.GROUP_BY_TIME;
this.groupByTimePlan = alignByDevicePlan.getGroupByTimePlan();
this.groupByTimePlan.setAscending(alignByDevicePlan.isAscending());
break;
@@ -115,7 +119,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
this.curDataSetInitialized = false;
- this.deviceIterator = devices.iterator();
}
public int getPathsNum() {
@@ -133,37 +136,22 @@ public class AlignByDeviceDataSet extends QueryDataSet {
while (deviceIterator.hasNext()) {
currentDevice = deviceIterator.next();
- // get all measurements of current device
- Map<String, MeasurementPath> measurementToPathMap =
- getMeasurementsUnderGivenDevice(currentDevice);
- Set<String> measurementOfGivenDevice = measurementToPathMap.keySet();
-
- // extract paths and aggregations queried from all measurements
- // executeColumns is for calculating rowRecord
executeColumns = new ArrayList<>();
List<PartialPath> executePaths = new ArrayList<>();
List<String> executeAggregations = new ArrayList<>();
- for (Entry<String, MeasurementInfo> entry : measurementInfoMap.entrySet()) {
- if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
- continue;
- }
- String column = entry.getKey();
- String measurement = column;
- if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
- measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
- if (measurementOfGivenDevice.contains(measurement)) {
- executeAggregations.add(column.substring(0, column.indexOf('(')));
- }
- }
- if (measurementOfGivenDevice.contains(measurement)) {
- executeColumns.add(column);
- executePaths.add(measurementToPathMap.get(measurement));
+ for (int i : deviceToPathIndex.get(currentDevice)) {
+ executePaths.add(paths.get(i));
+ String executeColumn = paths.get(i).getMeasurement();
+ if (aggregations != null) {
+ executeAggregations.add(aggregations.get(i));
+ executeColumn = String.format("%s(%s)", aggregations.get(i), executeColumn);
}
+ executeColumns.add(executeColumn);
}
// get filter to execute for the current device
if (deviceToFilterMap != null) {
- this.expression = deviceToFilterMap.get(currentDevice.getFullPath());
+ this.expression = deviceToFilterMap.get(currentDevice);
}
// for tracing: try to calculate the number of series paths
@@ -173,7 +161,13 @@ public class AlignByDeviceDataSet extends QueryDataSet {
try {
switch (dataSetType) {
- case GROUPBYTIME:
+ case GROUP_BY_FILL:
+ groupByFillPlan.setDeduplicatedPathsAndUpdate(executePaths);
+ groupByFillPlan.setDeduplicatedAggregations(executeAggregations);
+ groupByFillPlan.setExpression(expression);
+ currentDataSet = queryRouter.groupByFill(groupByFillPlan, context);
+ break;
+ case GROUP_BY_TIME:
groupByTimePlan.setDeduplicatedPathsAndUpdate(executePaths);
groupByTimePlan.setDeduplicatedAggregations(executeAggregations);
groupByTimePlan.setExpression(expression);
@@ -190,6 +184,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
currentDataSet = queryRouter.fill(fillQueryPlan, context);
break;
case QUERY:
+ // Group all the subSensors of one vector into one VectorPartialPath
+ executePaths = MetaUtils.groupAlignedPaths(executePaths);
rawDataQueryPlan.setDeduplicatedPathsAndUpdate(executePaths);
rawDataQueryPlan.setExpression(expression);
currentDataSet = queryRouter.rawDataQuery(rawDataQueryPlan, context);
@@ -219,23 +215,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
return false;
}
- /** Get all measurements under given device. */
- protected Map<String, MeasurementPath> getMeasurementsUnderGivenDevice(PartialPath device)
- throws IOException {
- try {
- // TODO: Implement this method in Cluster MManager
- Map<String, MeasurementPath> measurementToPathMap = new HashMap<>();
- List<MeasurementPath> measurementPaths =
- IoTDB.metaManager.getAllMeasurementByDevicePath(device);
- for (MeasurementPath measurementPath : measurementPaths) {
- measurementToPathMap.put(measurementPath.getMeasurement(), measurementPath);
- }
- return measurementToPathMap;
- } catch (MetadataException e) {
- throw new IOException("Cannot get node from " + device, e);
- }
- }
-
@Override
public RowRecord nextWithoutConstraint() throws IOException {
RowRecord originRowRecord = currentDataSet.next();
@@ -243,7 +222,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
RowRecord rowRecord = new RowRecord(originRowRecord.getTimestamp());
Field deviceField = new Field(TSDataType.TEXT);
- deviceField.setBinaryV(new Binary(currentDevice.getFullPath()));
+ deviceField.setBinaryV(new Binary(currentDevice));
rowRecord.addField(deviceField);
// device field should not be considered as a value field it should affect the WITHOUT NULL
// judgement
@@ -256,23 +235,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
for (String measurement : measurements) {
- MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
- switch (measurementInfo.getMeasurementType()) {
- case Exist:
- if (currentColumnMap.get(measurement) != null) {
- rowRecord.addField(currentColumnMap.get(measurement));
- } else {
- rowRecord.addField(new Field(null));
- }
- break;
- case NonExist:
- rowRecord.addField(new Field(null));
- break;
- case Constant:
- Field res = new Field(TSDataType.TEXT);
- res.setBinaryV(Binary.valueOf(measurement));
- rowRecord.addField(res);
- break;
+ if (currentColumnMap.get(measurement) != null) {
+ rowRecord.addField(currentColumnMap.get(measurement));
+ } else {
+ rowRecord.addField(new Field(null));
}
}
@@ -280,7 +246,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
private enum DataSetType {
- GROUPBYTIME,
+ GROUP_BY_FILL,
+ GROUP_BY_TIME,
AGGREGATE,
FILL,
QUERY
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 3df6ed7..6066cdb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -153,7 +153,7 @@ public class SchemaUtils {
}
public static List<TSDataType> getSeriesTypesByPaths(
- List<MeasurementPath> paths, List<String> aggregations) throws MetadataException {
+ List<MeasurementPath> paths, List<String> aggregations) {
List<TSDataType> tsDataTypes = new ArrayList<>();
for (int i = 0; i < paths.size(); i++) {
String aggrStr = aggregations != null ? aggregations.get(i) : null;
@@ -168,6 +168,15 @@ public class SchemaUtils {
return tsDataTypes;
}
+ public static TSDataType getSeriesTypeByPath(MeasurementPath path, String aggregation) {
+ TSDataType dataType = getAggregationType(aggregation);
+ if (dataType != null) {
+ return dataType;
+ } else {
+ return path.getSeriesType();
+ }
+ }
+
/**
* @param aggregation aggregation function
* @return the data type of the aggregation or null if it aggregation is null
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
index b1924a6..ec9cb41 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
@@ -39,8 +39,6 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-@Ignore
-// TODO: AlignByDevice
public class IoTDBSessionVectorABDeviceIT {
private static final String ROOT_SG1_D1 = "root.sg1.d1";
private static final String ROOT_SG1_D2 = "root.sg1.d2";
@@ -133,27 +131,14 @@ public class IoTDBSessionVectorABDeviceIT {
assertEquals("s2", dataSet.getColumnNames().get(3));
long time = 1L;
- int count = 0;
- int index = 0;
- String[] deviceArray = {"root.sg1.d2", "root.sg1.d1"};
+ String device = "root.sg1.d1";
while (dataSet.hasNext()) {
- count++;
RowRecord rowRecord = dataSet.next();
assertEquals(time, rowRecord.getTimestamp());
- assertEquals(deviceArray[index], rowRecord.getFields().get(0).getBinaryV().toString());
- if (index == 1) {
- assertEquals(time + 1, rowRecord.getFields().get(1).getLongV());
- assertEquals(time + 2, rowRecord.getFields().get(2).getLongV());
- assertEquals("null", rowRecord.getFields().get(3).getStringValue());
- } else {
- assertEquals(time + 3, rowRecord.getFields().get(1).getLongV());
- assertEquals(time + 4, rowRecord.getFields().get(2).getLongV());
- assertEquals(time + 5, rowRecord.getFields().get(3).getLongV());
- }
- if (count == 100) {
- count = 0;
- index++;
- }
+ assertEquals(device, rowRecord.getFields().get(0).getBinaryV().toString());
+ assertEquals(time + 1, rowRecord.getFields().get(1).getLongV());
+ assertEquals(time + 2, rowRecord.getFields().get(2).getLongV());
+ assertEquals("null", rowRecord.getFields().get(3).getStringValue());
dataSet.next();
}
@@ -164,6 +149,8 @@ public class IoTDBSessionVectorABDeviceIT {
}
}
+ // TODO: remove ignore after implementing aggregation
+ @Ignore
@Test
public void vectorAggregationAlignByDeviceTest() {
try {