You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/01/17 13:25:02 UTC

[iotdb] branch alignbydevice created (now eacad16)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at eacad16  fix tests

This branch includes the following new commits:

     new eacad16  fix tests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix tests

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eacad168257c5154cf75b0be38aa0050c3bb972e
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Jan 17 21:24:13 2022 +0800

    fix tests
---
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 46 ++++++++++++++++++++++
 .../iotdb/db/qp/logical/crud/QueryOperator.java    | 29 ++++++++------
 .../db/qp/physical/crud/AlignByDevicePlan.java     | 14 +++++++
 3 files changed, 77 insertions(+), 12 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 2b1509a..4e6c6b3 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -305,6 +305,52 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  public void selectSlimitTest2() {
+    String[] retArray =
+        new String[] {
+          "1,root.vehicle.d0,null,101,1101,",
+        };
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select * from root.vehicle.d0 limit 1 slimit 3 soffset 1 align by device");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        List<Integer> actualIndexToExpectedIndexList =
+            checkHeader(
+                resultSetMetaData,
+                "Time,Device,s4,s0,s1",
+                new int[] {
+                  Types.TIMESTAMP, Types.VARCHAR, Types.BOOLEAN, Types.INTEGER, Types.BIGINT,
+                });
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] expectedStrings = retArray[cnt].split(",");
+          StringBuilder expectedBuilder = new StringBuilder();
+          StringBuilder actualBuilder = new StringBuilder();
+          for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+            actualBuilder.append(resultSet.getString(i)).append(",");
+            expectedBuilder
+                .append(expectedStrings[actualIndexToExpectedIndexList.get(i - 1)])
+                .append(",");
+          }
+          Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
   public void selectSlimitTest() {
     String[] retArray =
         new String[] {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 4653ed6..4032f86 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -243,8 +243,7 @@ public class QueryOperator extends Operator {
 
     // remove stars in fromPaths and get deviceId with deduplication
     List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths());
-    List<ResultColumn> resultColumns =
-        convertSpecialClauseValues(alignByDevicePlan, selectComponent.getResultColumns());
+    List<ResultColumn> resultColumns = selectComponent.getResultColumns();
     List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
     // to record result measurement columns
     List<String> measurements = new ArrayList<>();
@@ -305,10 +304,17 @@ public class QueryOperator extends Operator {
         // therefore the final measurements is [s1,s2,s3,s1].
         measurements.addAll(measurementSetOfGivenSuffix);
       }
+
+      if (specialClauseComponent.hasSlimit()
+          && measurements.size()
+              >= specialClauseComponent.getSeriesLimit()
+                  + specialClauseComponent.getSeriesOffset()) {
+        break;
+      }
     }
 
     // assigns to alignByDevicePlan
-    alignByDevicePlan.setMeasurements(measurements);
+    alignByDevicePlan.setMeasurements(convertSpecialClauseValues(alignByDevicePlan, measurements));
     alignByDevicePlan.setPaths(paths);
     alignByDevicePlan.setAggregations(aggregations);
     alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
@@ -345,16 +351,16 @@ public class QueryOperator extends Operator {
     }
   }
 
-  private List<ResultColumn> convertSpecialClauseValues(
-      QueryPlan queryPlan, List<ResultColumn> resultColumns) throws QueryProcessException {
+  private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements)
+      throws QueryProcessException {
     convertSpecialClauseValues(queryPlan);
     // sLimit trim on the measurementColumnList
     if (specialClauseComponent.hasSlimit()) {
       int seriesSLimit = specialClauseComponent.getSeriesLimit();
       int seriesOffset = specialClauseComponent.getSeriesOffset();
-      return slimitTrimColumn(resultColumns, seriesSLimit, seriesOffset);
+      return slimitTrimColumn(measurements, seriesSLimit, seriesOffset);
     }
-    return resultColumns;
+    return measurements;
   }
 
   private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths)
@@ -391,10 +397,9 @@ public class QueryOperator extends Operator {
     return initialMeasurement;
   }
 
-  private List<ResultColumn> slimitTrimColumn(
-      List<ResultColumn> resultColumns, int seriesLimit, int seriesOffset)
-      throws QueryProcessException {
-    int size = resultColumns.size();
+  private List<String> slimitTrimColumn(
+      List<String> measurements, int seriesLimit, int seriesOffset) throws QueryProcessException {
+    int size = measurements.size();
 
     // check parameter range
     if (seriesOffset >= size) {
@@ -408,7 +413,7 @@ public class QueryOperator extends Operator {
     }
 
     // trim seriesPath list
-    return new ArrayList<>(resultColumns.subList(seriesOffset, endPosition));
+    return new ArrayList<>(measurements.subList(seriesOffset, endPosition));
   }
 
   // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 7132cfc..43d124c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -68,9 +69,14 @@ public class AlignByDevicePlan extends QueryPlan {
   public void deduplicate(PhysicalGenerator physicalGenerator) {
     Set<String> pathWithAggregationSet = new LinkedHashSet<>();
     List<String> deduplicatedAggregations = new ArrayList<>();
+    HashSet<String> measurements = new HashSet<>(getMeasurements());
     for (int i = 0; i < paths.size(); i++) {
       PartialPath path = paths.get(i);
       String aggregation = aggregations != null ? aggregations.get(i) : null;
+      String measurementWithAggregation = getMeasurementStrWithAggregation(path, aggregation);
+      if (!measurements.contains(measurementWithAggregation)) {
+        continue;
+      }
       String pathStrWithAggregation = getPathStrWithAggregation(path, aggregation);
       if (!pathWithAggregationSet.contains(pathStrWithAggregation)) {
         pathWithAggregationSet.add(pathStrWithAggregation);
@@ -209,6 +215,14 @@ public class AlignByDevicePlan extends QueryPlan {
     this.setOperatorType(Operator.OperatorType.AGGREGATION);
   }
 
+  private String getMeasurementStrWithAggregation(PartialPath path, String aggregation) {
+    String measurement = path.getMeasurement();
+    if (aggregation != null) {
+      measurement = aggregation + "(" + measurement + ")";
+    }
+    return measurement;
+  }
+
   private String getPathStrWithAggregation(PartialPath path, String aggregation) {
     String initialPath = path.getFullPath();
     if (aggregation != null) {