You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/06/09 06:11:59 UTC
[incubator-iotdb] 01/01: count bug
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TyCountBug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit cc0c07e1a1fe5252567b2f37e03aef581cd2033b
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jun 9 14:03:03 2020 +0800
count bug
---
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 4 +-
.../db/qp/physical/crud/AlignByDevicePlan.java | 20 +++-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 30 ++++--
.../db/query/dataset/AlignByDeviceDataSet.java | 11 +-
.../IoTDBAggregationAlignByDeviceIT.java | 113 +++++++++++++++++++++
5 files changed, 156 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index cce6140..1f37391 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -283,7 +283,7 @@ public class PlanExecutor implements IPlanExecutor {
protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
- IOException {
+ IOException, MetadataException {
QueryDataSet queryDataSet;
if (queryPlan instanceof AlignByDevicePlan) {
queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
@@ -315,7 +315,7 @@ public class PlanExecutor implements IPlanExecutor {
}
protected AlignByDeviceDataSet getAlignByDeviceDataSet(AlignByDevicePlan plan,
- QueryContext context, IQueryRouter router) {
+ QueryContext context, IQueryRouter router) throws MetadataException {
return new AlignByDeviceDataSet(plan, context, router);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 3edd8b2..dfad76e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -35,6 +35,9 @@ public class AlignByDevicePlan extends QueryPlan {
// to record different kinds of measurement
private Map<String, MeasurementType> measurementTypeMap;
+ // to record the real type of series
+ private Map<String, TSDataType> rawTypeMap;
+
private GroupByTimePlan groupByTimePlan;
private FillQueryPlan fillQueryPlan;
private AggregationPlan aggregationPlan;
@@ -85,6 +88,14 @@ public class AlignByDevicePlan extends QueryPlan {
this.measurementTypeMap = measurementTypeMap;
}
+ public Map<String, TSDataType> getRawTypeMap() {
+ return rawTypeMap;
+ }
+
+ public void setRawTypeMap(Map<String, TSDataType> rawTypeMap) {
+ this.rawTypeMap = rawTypeMap;
+ }
+
public GroupByTimePlan getGroupByTimePlan() {
return groupByTimePlan;
}
@@ -113,11 +124,10 @@ public class AlignByDevicePlan extends QueryPlan {
}
/**
- * Exist: the measurements which don't belong to NonExist and Constant.
- * NonExist: the measurements that do not exist in any device, data type is considered as String.
- * The value is considered as null.
- * Constant: the measurements that have quotation mark. e.g. "abc",'11'.
- * The data type is considered as String and the value is the measurement name.
+ * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
+ * that do not exist in any device, data type is considered as String. The value is considered as
+ * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is
+ * considered as String and the value is the measurement name.
*/
public enum MeasurementType {
Exist, NonExist, Constant;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 644b99b..ec2e987 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -98,6 +98,7 @@ import org.slf4j.Logger;
* Used to convert logical operator to physical plan
*/
public class PhysicalGenerator {
+
private static Logger logger = LoggerFactory.getLogger(PhysicalGenerator.class);
public PhysicalPlan transformToPhysicalPlan(Operator operator) throws QueryProcessException {
@@ -291,9 +292,10 @@ public class PhysicalGenerator {
}
- protected List<TSDataType> getSeriesTypes(List<String> paths,
+ protected Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(List<String> paths,
String aggregation) throws MetadataException {
- return SchemaUtils.getSeriesTypesByString(paths, aggregation);
+ return new Pair<>(SchemaUtils.getSeriesTypesByString(paths, aggregation),
+ SchemaUtils.getSeriesTypesByString(paths, null));
}
protected List<TSDataType> getSeriesTypes(List<Path> paths) throws MetadataException {
@@ -341,7 +343,8 @@ public class PhysicalGenerator {
if (queryOperator.getLevel() >= 0) {
for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) {
- if (!SQLConstant.COUNT.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
+ if (!SQLConstant.COUNT
+ .equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
throw new QueryProcessException("group by level only support count now.");
}
}
@@ -357,12 +360,13 @@ public class PhysicalGenerator {
((FillQueryPlan) queryPlan).setFillType(queryOperator.getFillTypes());
} else if (queryOperator.hasAggregation()) {
queryPlan = new AggregationPlan();
- ((AggregationPlan)queryPlan).setLevel(queryOperator.getLevel());
+ ((AggregationPlan) queryPlan).setLevel(queryOperator.getLevel());
((AggregationPlan) queryPlan)
.setAggregations(queryOperator.getSelectOperator().getAggregations());
if (queryOperator.getLevel() >= 0) {
for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) {
- if (!SQLConstant.COUNT.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
+ if (!SQLConstant.COUNT
+ .equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
throw new QueryProcessException("group by level only support count now.");
}
}
@@ -387,7 +391,7 @@ public class PhysicalGenerator {
} else if (queryPlan instanceof FillQueryPlan) {
alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
} else if (queryPlan instanceof AggregationPlan) {
- if (((AggregationPlan)queryPlan).getLevel() >= 0) {
+ if (((AggregationPlan) queryPlan).getLevel() >= 0) {
throw new QueryProcessException("group by level does not support align by device now.");
}
alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
@@ -403,6 +407,7 @@ public class PhysicalGenerator {
List<String> measurements = new ArrayList<>();
// to check the same measurement of different devices having the same datatype
Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+ Map<String, TSDataType> rawTypeMap = new HashMap<>();
Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
List<Path> paths = new ArrayList<>();
@@ -436,7 +441,10 @@ public class PhysicalGenerator {
String aggregation =
originAggregations != null && !originAggregations.isEmpty()
? originAggregations.get(i) : null;
- List<TSDataType> dataTypes = getSeriesTypes(actualPaths, aggregation);
+ Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths,
+ aggregation);
+ List<TSDataType> aggregationDataTypes = pair.left;
+ List<TSDataType> rawTypes = pair.right;
for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
Path path = new Path(actualPaths.get(pathIdx));
@@ -449,16 +457,17 @@ public class PhysicalGenerator {
} else {
measurementChecked = path.getMeasurement();
}
- TSDataType dataType = dataTypes.get(pathIdx);
+ TSDataType aggregationDataType = aggregationDataTypes.get(pathIdx);
if (measurementDataTypeMap.containsKey(measurementChecked)) {
- if (!dataType.equals(measurementDataTypeMap.get(measurementChecked))) {
+ if (!aggregationDataType.equals(measurementDataTypeMap.get(measurementChecked))) {
throw new QueryProcessException(
"The data types of the same measurement column should be the same across "
+ "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
+ "SQL document.");
}
} else {
- measurementDataTypeMap.put(measurementChecked, dataType);
+ measurementDataTypeMap.put(measurementChecked, aggregationDataType);
+ rawTypeMap.put(measurementChecked, rawTypes.get(pathIdx));
}
// update measurementSetOfGivenSuffix and Normal measurement
@@ -500,6 +509,7 @@ public class PhysicalGenerator {
alignByDevicePlan.setDevices(devices);
alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+ alignByDevicePlan.setRawTypeMap(rawTypeMap);
alignByDevicePlan.setPaths(paths);
// get deviceToFilterMap
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 438f3a8..7436bdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -62,7 +62,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private List<String> devices;
private Map<String, IExpression> deviceToFilterMap;
private Map<String, MeasurementType> measurementTypeMap;
- private Map<String, TSDataType> measurementDataTpeMap;
+ private Map<String, TSDataType> measurementDataTypeMap;
private GroupByTimePlan groupByTimePlan;
private FillQueryPlan fillQueryPlan;
@@ -76,12 +76,12 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private List<String> executeColumns;
public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context,
- IQueryRouter queryRouter) {
+ IQueryRouter queryRouter) throws MetadataException {
super(null, alignByDevicePlan.getDataTypes());
this.measurements = alignByDevicePlan.getMeasurements();
this.devices = alignByDevicePlan.getDevices();
- this.measurementDataTpeMap = alignByDevicePlan.getMeasurementDataTypeMap();
+ this.measurementDataTypeMap = alignByDevicePlan.getMeasurementDataTypeMap();
this.queryRouter = queryRouter;
this.context = context;
this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
@@ -95,6 +95,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
case AGGREGATION:
this.dataSetType = DataSetType.AGGREGATE;
this.aggregationPlan = alignByDevicePlan.getAggregationPlan();
+ this.measurementDataTypeMap = alignByDevicePlan.getRawTypeMap();
break;
case FILL:
this.dataSetType = DataSetType.FILL;
@@ -127,7 +128,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
List<Path> executePaths = new ArrayList<>();
List<TSDataType> tsDataTypes = new ArrayList<>();
List<String> executeAggregations = new ArrayList<>();
- for (String column : measurementDataTpeMap.keySet()) {
+ for (String column : measurementDataTypeMap.keySet()) {
String measurement = column;
if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
@@ -138,7 +139,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
if (measurementOfGivenDevice.contains(measurement)) {
executeColumns.add(column);
executePaths.add(new Path(currentDevice, measurement));
- tsDataTypes.add(measurementDataTpeMap.get(column));
+ tsDataTypes.add(measurementDataTypeMap.get(column));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
new file mode 100644
index 0000000..3655fe8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBAggregationAlignByDeviceIT {
+
+ private static final String[] dataSet = new String[]{
+ "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+ "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+ "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+ "flush",
+ "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+ "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+ "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+ "flush",
+ "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+ "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+ "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+ "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+ "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+ "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() throws SQLException {
+ String[] retArray = new String[]{"root.sg1.d1,3,3,3", "root.sg1.d2,3,3,3",};
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement
+ .execute("SELECT count(*) from root align by device");
+
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString("Device") + "," + resultSet.getString(count("s1"))
+ + "," + resultSet.getString(count("s2")) + "," + resultSet
+ .getString(count("s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private void prepareData() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : dataSet) {
+ statement.execute(sql);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}