You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/22 09:32:42 UTC

[iotdb] 02/02: sort timeseries

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch IOTDB-3883
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 629f1e3e7e65b6afde7de11b27f5d7dd3329c8f3
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Fri Jul 22 17:32:17 2022 +0800

    sort timeseries
---
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 31 +++++++++++++++++++---
 .../planner/plan/node/process/MultiChildNode.java  | 21 +++++++++++++++
 .../db/mpp/plan/statement/crud/QueryStatement.java |  3 +++
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     |  8 +++---
 4 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 9279c0ae31..267af92402 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -48,6 +48,8 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
@@ -106,6 +108,7 @@ import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
 
 /** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
@@ -722,10 +725,29 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
   private Analysis analyzeLast(
       Analysis analysis, List<MeasurementPath> allSelectedPath, SchemaTree schemaTree) {
-    Set<Expression> sourceExpressions =
-        allSelectedPath.stream()
-            .map(TimeSeriesOperand::new)
-            .collect(Collectors.toCollection(LinkedHashSet::new));
+    Set<Expression> sourceExpressions;
+    List<SortItem> sortItemList = analysis.getMergeOrderParameter().getSortItemList();
+    if (sortItemList.size() > 0) {
+      checkState(
+          sortItemList.size() == 1 && sortItemList.get(0).getSortKey() == SortKey.TIMESERIES,
+          "Last queries only support sorting by timeseries now.");
+      boolean isAscending = sortItemList.get(0).getOrdering() == Ordering.ASC;
+      sourceExpressions =
+          allSelectedPath.stream()
+              .map(TimeSeriesOperand::new)
+              .sorted(
+                  (o1, o2) ->
+                      isAscending
+                          ? o1.getExpressionString().compareTo(o2.getExpressionString())
+                          : o2.getExpressionString().compareTo(o1.getExpressionString()))
+              .collect(Collectors.toCollection(LinkedHashSet::new));
+    } else {
+      sourceExpressions =
+          allSelectedPath.stream()
+              .map(TimeSeriesOperand::new)
+              .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+
     sourceExpressions.forEach(
         expression -> ExpressionAnalyzer.updateTypeProvider(expression, typeProvider));
     analysis.setSourceExpressions(sourceExpressions);
@@ -793,6 +815,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     SchemaTree schemaTree = new SchemaTree();
     schemaTree.setStorageGroups(statement.getStorageGroups());
 
+    analysis.setMergeOrderParameter(new OrderByParameter());
     return analyzeLast(analysis, statement.getSelectedPaths(), schemaTree);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
index 9afce52a03..9e699f370c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 public abstract class MultiChildNode extends ProcessNode {
 
@@ -42,4 +43,24 @@ public abstract class MultiChildNode extends ProcessNode {
   public void setChildren(List<PlanNode> children) {
     this.children = children;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    MultiChildNode that = (MultiChildNode) o;
+    return children.equals(that.children);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), children);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 9067f7d55e..0ab276ba33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -298,6 +298,9 @@ public class QueryStatement extends Statement {
         throw new SemanticException(
             "Sorting by device is only supported in ALIGN BY DEVICE queries.");
       }
+      if (isOrderByTime()) {
+        throw new SemanticException("Sorting by time is not yet supported in last queries.");
+      }
     }
 
     if (!isAlignByDevice() && !isLastQuery()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 7e9622ddb4..8898bccbdf 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -113,9 +113,6 @@ public class QueryLogicalPlanUtil {
 
     QueryId queryId = new QueryId("test");
     List<PlanNode> sourceNodeList = new ArrayList<>();
-    sourceNodeList.add(
-        new LastQueryScanNode(
-            queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s3")));
     sourceNodeList.add(
         new LastQueryScanNode(
             queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s1")));
@@ -124,7 +121,7 @@ public class QueryLogicalPlanUtil {
             queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s2")));
     sourceNodeList.add(
         new LastQueryScanNode(
-            queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s4")));
+            queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s3")));
     sourceNodeList.add(
         new AlignedLastQueryScanNode(
             queryId.genPlanNodeId(),
@@ -139,6 +136,9 @@ public class QueryLogicalPlanUtil {
     sourceNodeList.add(
         new LastQueryScanNode(
             queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s2")));
+    sourceNodeList.add(
+        new LastQueryScanNode(
+            queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s4")));
 
     LastQueryNode lastQueryNode =
         new LastQueryNode(