You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/06/09 06:11:58 UTC

[incubator-iotdb] branch TyCountBug created (now cc0c07e)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch TyCountBug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at cc0c07e  count bug

This branch includes the following new commits:

     new cc0c07e  count bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: count bug

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TyCountBug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit cc0c07e1a1fe5252567b2f37e03aef581cd2033b
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jun 9 14:03:03 2020 +0800

    count bug
---
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   4 +-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  20 +++-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  30 ++++--
 .../db/query/dataset/AlignByDeviceDataSet.java     |  11 +-
 .../IoTDBAggregationAlignByDeviceIT.java           | 113 +++++++++++++++++++++
 5 files changed, 156 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index cce6140..1f37391 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -283,7 +283,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
       throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
-      IOException {
+      IOException, MetadataException {
     QueryDataSet queryDataSet;
     if (queryPlan instanceof AlignByDevicePlan) {
       queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
@@ -315,7 +315,7 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   protected AlignByDeviceDataSet getAlignByDeviceDataSet(AlignByDevicePlan plan,
-      QueryContext context, IQueryRouter router) {
+      QueryContext context, IQueryRouter router) throws MetadataException {
     return new AlignByDeviceDataSet(plan, context, router);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 3edd8b2..dfad76e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -35,6 +35,9 @@ public class AlignByDevicePlan extends QueryPlan {
   // to record different kinds of measurement
   private Map<String, MeasurementType> measurementTypeMap;
 
+  // to record the real type of series
+  private Map<String, TSDataType> rawTypeMap;
+
   private GroupByTimePlan groupByTimePlan;
   private FillQueryPlan fillQueryPlan;
   private AggregationPlan aggregationPlan;
@@ -85,6 +88,14 @@ public class AlignByDevicePlan extends QueryPlan {
     this.measurementTypeMap = measurementTypeMap;
   }
 
+  public Map<String, TSDataType> getRawTypeMap() {
+    return rawTypeMap;
+  }
+
+  public void setRawTypeMap(Map<String, TSDataType> rawTypeMap) {
+    this.rawTypeMap = rawTypeMap;
+  }
+
   public GroupByTimePlan getGroupByTimePlan() {
     return groupByTimePlan;
   }
@@ -113,11 +124,10 @@ public class AlignByDevicePlan extends QueryPlan {
   }
 
   /**
-   * Exist: the measurements which don't belong to NonExist and Constant.
-   * NonExist: the measurements that do not exist in any device, data type is considered as String.
-   * The value is considered as null.
-   * Constant: the measurements that have quotation mark. e.g. "abc",'11'.
-   * The data type is considered as String and the value is the measurement name.
+   * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
+   * that do not exist in any device, data type is considered as String. The value is considered as
+   * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is
+   * considered as String and the value is the measurement name.
    */
   public enum MeasurementType {
     Exist, NonExist, Constant;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 644b99b..ec2e987 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -98,6 +98,7 @@ import org.slf4j.Logger;
  * Used to convert logical operator to physical plan
  */
 public class PhysicalGenerator {
+
   private static Logger logger = LoggerFactory.getLogger(PhysicalGenerator.class);
 
   public PhysicalPlan transformToPhysicalPlan(Operator operator) throws QueryProcessException {
@@ -291,9 +292,10 @@ public class PhysicalGenerator {
 
   }
 
-  protected List<TSDataType> getSeriesTypes(List<String> paths,
+  protected Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(List<String> paths,
       String aggregation) throws MetadataException {
-    return SchemaUtils.getSeriesTypesByString(paths, aggregation);
+    return new Pair<>(SchemaUtils.getSeriesTypesByString(paths, aggregation),
+        SchemaUtils.getSeriesTypesByString(paths, null));
   }
 
   protected List<TSDataType> getSeriesTypes(List<Path> paths) throws MetadataException {
@@ -341,7 +343,8 @@ public class PhysicalGenerator {
 
       if (queryOperator.getLevel() >= 0) {
         for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) {
-          if (!SQLConstant.COUNT.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
+          if (!SQLConstant.COUNT
+              .equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
             throw new QueryProcessException("group by level only support count now.");
           }
         }
@@ -357,12 +360,13 @@ public class PhysicalGenerator {
       ((FillQueryPlan) queryPlan).setFillType(queryOperator.getFillTypes());
     } else if (queryOperator.hasAggregation()) {
       queryPlan = new AggregationPlan();
-      ((AggregationPlan)queryPlan).setLevel(queryOperator.getLevel());
+      ((AggregationPlan) queryPlan).setLevel(queryOperator.getLevel());
       ((AggregationPlan) queryPlan)
           .setAggregations(queryOperator.getSelectOperator().getAggregations());
       if (queryOperator.getLevel() >= 0) {
         for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) {
-          if (!SQLConstant.COUNT.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
+          if (!SQLConstant.COUNT
+              .equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
             throw new QueryProcessException("group by level only support count now.");
           }
         }
@@ -387,7 +391,7 @@ public class PhysicalGenerator {
       } else if (queryPlan instanceof FillQueryPlan) {
         alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
       } else if (queryPlan instanceof AggregationPlan) {
-        if (((AggregationPlan)queryPlan).getLevel() >= 0) {
+        if (((AggregationPlan) queryPlan).getLevel() >= 0) {
           throw new QueryProcessException("group by level does not support align by device now.");
         }
         alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
@@ -403,6 +407,7 @@ public class PhysicalGenerator {
       List<String> measurements = new ArrayList<>();
       // to check the same measurement of different devices having the same datatype
       Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+      Map<String, TSDataType> rawTypeMap = new HashMap<>();
       Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
       List<Path> paths = new ArrayList<>();
 
@@ -436,7 +441,10 @@ public class PhysicalGenerator {
             String aggregation =
                 originAggregations != null && !originAggregations.isEmpty()
                     ? originAggregations.get(i) : null;
-            List<TSDataType> dataTypes = getSeriesTypes(actualPaths, aggregation);
+            Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths,
+                aggregation);
+            List<TSDataType> aggregationDataTypes = pair.left;
+            List<TSDataType> rawTypes = pair.right;
             for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
               Path path = new Path(actualPaths.get(pathIdx));
 
@@ -449,16 +457,17 @@ public class PhysicalGenerator {
               } else {
                 measurementChecked = path.getMeasurement();
               }
-              TSDataType dataType = dataTypes.get(pathIdx);
+              TSDataType aggregationDataType = aggregationDataTypes.get(pathIdx);
               if (measurementDataTypeMap.containsKey(measurementChecked)) {
-                if (!dataType.equals(measurementDataTypeMap.get(measurementChecked))) {
+                if (!aggregationDataType.equals(measurementDataTypeMap.get(measurementChecked))) {
                   throw new QueryProcessException(
                       "The data types of the same measurement column should be the same across "
                           + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
                           + "SQL document.");
                 }
               } else {
-                measurementDataTypeMap.put(measurementChecked, dataType);
+                measurementDataTypeMap.put(measurementChecked, aggregationDataType);
+                rawTypeMap.put(measurementChecked, rawTypes.get(pathIdx));
               }
 
               // update measurementSetOfGivenSuffix and Normal measurement
@@ -500,6 +509,7 @@ public class PhysicalGenerator {
       alignByDevicePlan.setDevices(devices);
       alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
       alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+      alignByDevicePlan.setRawTypeMap(rawTypeMap);
       alignByDevicePlan.setPaths(paths);
 
       // get deviceToFilterMap
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 438f3a8..7436bdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -62,7 +62,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private List<String> devices;
   private Map<String, IExpression> deviceToFilterMap;
   private Map<String, MeasurementType> measurementTypeMap;
-  private Map<String, TSDataType> measurementDataTpeMap;
+  private Map<String, TSDataType> measurementDataTypeMap;
 
   private GroupByTimePlan groupByTimePlan;
   private FillQueryPlan fillQueryPlan;
@@ -76,12 +76,12 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private List<String> executeColumns;
 
   public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context,
-      IQueryRouter queryRouter) {
+      IQueryRouter queryRouter) throws MetadataException {
     super(null, alignByDevicePlan.getDataTypes());
 
     this.measurements = alignByDevicePlan.getMeasurements();
     this.devices = alignByDevicePlan.getDevices();
-    this.measurementDataTpeMap = alignByDevicePlan.getMeasurementDataTypeMap();
+    this.measurementDataTypeMap = alignByDevicePlan.getMeasurementDataTypeMap();
     this.queryRouter = queryRouter;
     this.context = context;
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
@@ -95,6 +95,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       case AGGREGATION:
         this.dataSetType = DataSetType.AGGREGATE;
         this.aggregationPlan = alignByDevicePlan.getAggregationPlan();
+        this.measurementDataTypeMap = alignByDevicePlan.getRawTypeMap();
         break;
       case FILL:
         this.dataSetType = DataSetType.FILL;
@@ -127,7 +128,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       List<Path> executePaths = new ArrayList<>();
       List<TSDataType> tsDataTypes = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (String column : measurementDataTpeMap.keySet()) {
+      for (String column : measurementDataTypeMap.keySet()) {
         String measurement = column;
         if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
           measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
@@ -138,7 +139,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         if (measurementOfGivenDevice.contains(measurement)) {
           executeColumns.add(column);
           executePaths.add(new Path(currentDevice, measurement));
-          tsDataTypes.add(measurementDataTpeMap.get(column));
+          tsDataTypes.add(measurementDataTypeMap.get(column));
         }
       }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
new file mode 100644
index 0000000..3655fe8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBAggregationAlignByDeviceIT {
+
+  private static final String[] dataSet = new String[]{
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+      "flush",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+      "flush",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+  };
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() throws SQLException {
+    String[] retArray = new String[]{"root.sg1.d1,3,3,3", "root.sg1.d2,3,3,3",};
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement
+          .execute("SELECT count(*) from root align by device");
+
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString("Device") + "," + resultSet.getString(count("s1"))
+                  + "," + resultSet.getString(count("s2")) + "," + resultSet
+                  .getString(count("s3"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void prepareData() {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : dataSet) {
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}