You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/06/09 06:43:53 UTC

[incubator-iotdb] branch bug/countBug created (now f92c380)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch bug/countBug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at f92c380  resolve conflicts

This branch includes the following new commits:

     new f92c380  resolve conflicts

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: resolve conflicts

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch bug/countBug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit f92c3803fdeed2f77576ec85663b33218125a6c8
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jun 9 14:03:03 2020 +0800

    resolve conflicts
---
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  12 +++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  23 +++--
 .../db/query/dataset/AlignByDeviceDataSet.java     |  11 +-
 .../IoTDBAggregationAlignByDeviceIT.java           | 113 +++++++++++++++++++++
 5 files changed, 148 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 831d269..27c31e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -293,7 +293,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
       throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
-      IOException {
+      IOException, MetadataException {
     QueryDataSet queryDataSet;
     if (queryPlan instanceof AlignByDevicePlan) {
       queryDataSet = new AlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 297c7b3..c822a53 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -35,6 +35,10 @@ public class AlignByDevicePlan extends QueryPlan {
   // to record different kinds of measurement
   private Map<String, MeasurementType> measurementTypeMap;
 
+
+  // to record the real type of series
+  private Map<String, TSDataType> rawTypeMap;
+
   private GroupByPlan groupByPlan;
   private FillQueryPlan fillQueryPlan;
   private AggregationPlan aggregationPlan;
@@ -85,6 +89,14 @@ public class AlignByDevicePlan extends QueryPlan {
     this.measurementTypeMap = measurementTypeMap;
   }
 
+  public Map<String, TSDataType> getRawTypeMap() {
+    return rawTypeMap;
+  }
+
+  public void setRawTypeMap(Map<String, TSDataType> rawTypeMap) {
+    this.rawTypeMap = rawTypeMap;
+  }
+
   public GroupByPlan getGroupByPlan() {
     return groupByPlan;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 8a1d8e3..4867753 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -276,9 +276,11 @@ public class PhysicalGenerator {
     }
   }
 
-  protected List<TSDataType> getSeriesTypes(List<String> paths, String aggregation)
-      throws MetadataException {
-    return SchemaUtils.getSeriesTypesByString(paths, aggregation);
+
+  protected Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(List<String> paths,
+      String aggregation) throws MetadataException {
+    return new Pair<>(SchemaUtils.getSeriesTypesByString(paths, aggregation),
+        SchemaUtils.getSeriesTypesByString(paths, null));
   }
 
   protected List<TSDataType> getSeriesTypes(List<Path> paths) throws MetadataException {
@@ -322,6 +324,7 @@ public class PhysicalGenerator {
       }
       ((GroupByPlan) queryPlan)
           .setAggregations(queryOperator.getSelectOperator().getAggregations());
+
     } else if (queryOperator.isFill()) {
       queryPlan = new FillQueryPlan();
       FilterOperator timeFilter = queryOperator.getFilterOperator();
@@ -368,6 +371,7 @@ public class PhysicalGenerator {
       List<String> measurements = new ArrayList<>();
       // to check the same measurement of different devices having the same datatype
       Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+      Map<String, TSDataType> rawTypeMap = new HashMap<>();
       Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
       List<Path> paths = new ArrayList<>();
 
@@ -401,7 +405,10 @@ public class PhysicalGenerator {
             String aggregation =
                 originAggregations != null && !originAggregations.isEmpty()
                     ? originAggregations.get(i) : null;
-            List<TSDataType> dataTypes = getSeriesTypes(actualPaths, aggregation);
+            Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths,
+                aggregation);
+            List<TSDataType> aggregationDataTypes = pair.left;
+            List<TSDataType> rawTypes = pair.right;
             for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
               Path path = new Path(actualPaths.get(pathIdx));
 
@@ -414,16 +421,17 @@ public class PhysicalGenerator {
               } else {
                 measurementChecked = path.getMeasurement();
               }
-              TSDataType dataType = dataTypes.get(pathIdx);
+              TSDataType aggregationDataType = aggregationDataTypes.get(pathIdx);
               if (measurementDataTypeMap.containsKey(measurementChecked)) {
-                if (!dataType.equals(measurementDataTypeMap.get(measurementChecked))) {
+                if (!aggregationDataType.equals(measurementDataTypeMap.get(measurementChecked))) {
                   throw new QueryProcessException(
                       "The data types of the same measurement column should be the same across "
                           + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
                           + "SQL document.");
                 }
               } else {
-                measurementDataTypeMap.put(measurementChecked, dataType);
+                measurementDataTypeMap.put(measurementChecked, aggregationDataType);
+                rawTypeMap.put(measurementChecked, rawTypes.get(pathIdx));
               }
 
               // update measurementSetOfGivenSuffix and Normal measurement
@@ -465,6 +473,7 @@ public class PhysicalGenerator {
       alignByDevicePlan.setDevices(devices);
       alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
       alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+      alignByDevicePlan.setRawTypeMap(rawTypeMap);
       alignByDevicePlan.setPaths(paths);
 
       // get deviceToFilterMap
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 5b4e9b0..be69936 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -62,7 +62,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private List<String> devices;
   private Map<String, IExpression> deviceToFilterMap;
   private Map<String, MeasurementType> measurementTypeMap;
-  private Map<String, TSDataType> measurementDataTpeMap;
+  private Map<String, TSDataType> measurementDataTypeMap;
 
   private GroupByPlan groupByPlan;
   private FillQueryPlan fillQueryPlan;
@@ -76,12 +76,12 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private List<String> executeColumns;
 
   public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context,
-      IQueryRouter queryRouter) {
+      IQueryRouter queryRouter) throws MetadataException {
     super(null, alignByDevicePlan.getDataTypes());
 
     this.measurements = alignByDevicePlan.getMeasurements();
     this.devices = alignByDevicePlan.getDevices();
-    this.measurementDataTpeMap = alignByDevicePlan.getMeasurementDataTypeMap();
+    this.measurementDataTypeMap = alignByDevicePlan.getMeasurementDataTypeMap();
     this.queryRouter = queryRouter;
     this.context = context;
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
@@ -95,6 +95,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       case AGGREGATION:
         this.dataSetType = DataSetType.AGGREGATE;
         this.aggregationPlan = alignByDevicePlan.getAggregationPlan();
+        this.measurementDataTypeMap = alignByDevicePlan.getRawTypeMap();
         break;
       case FILL:
         this.dataSetType = DataSetType.FILL;
@@ -132,7 +133,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       List<Path> executePaths = new ArrayList<>();
       List<TSDataType> tsDataTypes = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (String column : measurementDataTpeMap.keySet()) {
+      for (String column : measurementDataTypeMap.keySet()) {
         String measurement = column;
         if (dataSetType == DataSetType.GROUPBY || dataSetType == DataSetType.AGGREGATE) {
           measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
@@ -143,7 +144,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         if (measurementOfGivenDevice.contains(measurement)) {
           executeColumns.add(column);
           executePaths.add(new Path(currentDevice, measurement));
-          tsDataTypes.add(measurementDataTpeMap.get(column));
+          tsDataTypes.add(measurementDataTypeMap.get(column));
         }
       }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
new file mode 100644
index 0000000..3655fe8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBAggregationAlignByDeviceIT {
+
+  private static final String[] dataSet = new String[]{
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+      "flush",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+      "flush",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+  };
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() throws SQLException {
+    String[] retArray = new String[]{"root.sg1.d1,3,3,3", "root.sg1.d2,3,3,3",};
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement
+          .execute("SELECT count(*) from root align by device");
+
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString("Device") + "," + resultSet.getString(count("s1"))
+                  + "," + resultSet.getString(count("s2")) + "," + resultSet
+                  .getString(count("s3"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void prepareData() {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : dataSet) {
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}