You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/10 12:25:52 UTC

[iotdb] 01/01: [IOTDB-2262][Aligned timeseries] support in select ... into ... clauses

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2262
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8ff30d20efd7bb7dda73e4d631ba7cf28d25971d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Jan 10 20:25:03 2022 +0800

    [IOTDB-2262][Aligned timeseries] support  in select ... into ... clauses
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  2 +-
 docs/UserGuide/Advanced-Features/Select-Into.md    | 19 +++++++-
 docs/zh/UserGuide/Advanced-Features/Select-Into.md | 20 +++++++-
 .../iotdb/db/integration/IoTDBSelectIntoIT.java    | 55 ++++++++++++++++++++++
 .../selectinto/InsertTabletPlanGenerator.java      |  8 +++-
 .../selectinto/InsertTabletPlansIterator.java      |  8 +++-
 .../db/qp/logical/crud/SelectIntoOperator.java     | 10 +++-
 .../iotdb/db/qp/physical/crud/SelectIntoPlan.java  | 18 ++++++-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  1 +
 .../db/service/thrift/impl/TSServiceImpl.java      |  6 +--
 10 files changed, 135 insertions(+), 12 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index eb58825..de38351 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -311,7 +311,7 @@ selectStatement
     ;
 
 intoClause
-    : INTO intoPath (COMMA intoPath)*
+    : INTO ALIGNED? intoPath (COMMA intoPath)*
     ;
 
 intoPath
diff --git a/docs/UserGuide/Advanced-Features/Select-Into.md b/docs/UserGuide/Advanced-Features/Select-Into.md
index da180ec..d778a74 100644
--- a/docs/UserGuide/Advanced-Features/Select-Into.md
+++ b/docs/UserGuide/Advanced-Features/Select-Into.md
@@ -51,7 +51,7 @@ The `intoClause` is the mark clause for query write-back.
 
 ```sql
 intoClause
-  : INTO intoPath (COMMA intoPath)*
+  : INTO ALIGNED? intoPath (COMMA intoPath)*
   ;
 
 intoPath
@@ -115,6 +115,23 @@ For example, for the path `root.sg1.d1.v1`,  `${1}` means `sg1`,  `${2}` means `
 
 
 
+**You can specify whether the target timeseries are aligned via the keyword `ALIGNED`. **
+
+When the target aligned timeseries are existed, you need to ensure that the types of the source and target time series match.
+
+When the target aligned timeseries are not existed, the system will automatically create the target aligned time series.
+
+
+   * Example:
+
+     ```sql
+     select s1, s2, s3
+     into root.sg.d2.t1, root.sg.d2.t2, root.sg.d2.t3
+     from root.sg.d1
+     ````
+
+
+
 ### Supported Query Types
 
 **Note that except for the following types of queries, other types of queries (such as `LAST` queries) are not supported. **
diff --git a/docs/zh/UserGuide/Advanced-Features/Select-Into.md b/docs/zh/UserGuide/Advanced-Features/Select-Into.md
index f3e31df..feff057 100644
--- a/docs/zh/UserGuide/Advanced-Features/Select-Into.md
+++ b/docs/zh/UserGuide/Advanced-Features/Select-Into.md
@@ -51,7 +51,7 @@ specialClause?
 
 ```sql
 intoClause
-  : INTO intoPath (COMMA intoPath)*
+  : INTO ALIGNED? intoPath (COMMA intoPath)*
   ;
 
 intoPath
@@ -115,6 +115,24 @@ intoPath
 
 
 
+**您可以通过关键词  `ALIGNED` 指定 `intoPath`(目标序列)是否为一个对齐时间序列。**
+
+当目标序列存在时,您需要保证源序列和目标时间序列的类型匹配。
+
+当目标序列不存在时,系统将自动创建一个新的目标对齐时间序列。
+
+
+  * 例子:
+
+    ```sql
+    select s1, s2, s3
+    into root.sg.d2.t1, root.sg.d2.t2, root.sg.d2.t3
+    from root.sg.d1
+    ```
+
+
+
+
 ### 支持写回的查询类型
 
 **注意,除了下述类型的查询,其余类型的查询(如`LAST`查询)都不被支持。**
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSelectIntoIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSelectIntoIT.java
index 283f1db..a8febac 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSelectIntoIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSelectIntoIT.java
@@ -361,6 +361,61 @@ public class IoTDBSelectIntoIT {
   }
 
   @Test
+  public void testSelectIntoAlignedTimeSeriesCorrectly() {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "select s1, s1 "
+              + "into aligned root.sg.`aligned`.s1s2, root.sg.`aligned`.s1s3 "
+              + "from root.sg.d1 "
+              + "where time <= 2");
+      statement.execute(
+          "select s1, s1 "
+              + "into aligned root.sg.`aligned`.s1s2, root.sg.`aligned`.s1s3 "
+              + "from root.sg.d1 "
+              + "where time > 2");
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select s1s2, s1s3 from root.sg.`aligned`")) {
+        assertEquals(1 + 2, resultSet.getMetaData().getColumnCount());
+
+        for (int i = 1; i < INSERTION_SQLS.length; ++i) {
+          assertTrue(resultSet.next());
+          for (int j = 0; j < 2 + 1; ++j) {
+            assertEquals(resultSet.getString(2), resultSet.getString(3));
+          }
+        }
+
+        assertFalse(resultSet.next());
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testSelectIntoAlignedTimeSeriesWithUnmatchedTypes() {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("create aligned timeseries root.sg.`aligned`(s1 TEXT, s2 TEXT)");
+      statement.execute(
+          "select s1, s1 "
+              + "into aligned root.sg.`aligned`.s1, root.sg.`aligned`.s2 "
+              + "from root.sg.d1");
+      fail();
+    } catch (SQLException throwable) {
+      assertTrue(
+          throwable
+              .getMessage()
+              .contains("failed to insert measurements [s1, s2] caused by DataType mismatch"));
+    }
+  }
+
+  @Test
   public void testGroupByQuery() {
     try (Connection connection =
             DriverManager.getConnection(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlanGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlanGenerator.java
index b0c83b5..6847a8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlanGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlanGenerator.java
@@ -43,6 +43,7 @@ public class InsertTabletPlanGenerator {
   private final List<String> targetMeasurementIds;
 
   private final int tabletRowLimit;
+  private final boolean isIntoPathsAligned;
 
   // the following fields are used to construct plan
   private int rowCount;
@@ -53,12 +54,15 @@ public class InsertTabletPlanGenerator {
 
   private int numberOfInitializedColumns;
 
-  public InsertTabletPlanGenerator(String targetDevice, int tabletRowLimit) {
+  public InsertTabletPlanGenerator(
+      String targetDevice, int tabletRowLimit, boolean isIntoPathsAligned) {
     this.targetDevice = targetDevice;
     queryDataSetIndexes = new ArrayList<>();
     targetMeasurementIds = new ArrayList<>();
 
     this.tabletRowLimit = tabletRowLimit;
+
+    this.isIntoPathsAligned = isIntoPathsAligned;
   }
 
   public void collectTargetPathInformation(String targetMeasurementId, int queryDataSetIndex) {
@@ -206,7 +210,7 @@ public class InsertTabletPlanGenerator {
 
     InsertTabletPlan insertTabletPlan =
         new InsertTabletPlan(new PartialPath(targetDevice), nonEmptyColumnNames);
-    insertTabletPlan.setAligned(false);
+    insertTabletPlan.setAligned(isIntoPathsAligned);
     insertTabletPlan.setRowCount(rowCount);
 
     if (countOfNonEmptyColumns != columns.length) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
index 20ba4a0..a7b0738 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
@@ -45,6 +45,7 @@ public class InsertTabletPlansIterator {
 
   private final PartialPath fromPath;
   private final List<PartialPath> intoPaths;
+  private final boolean isIntoPathsAligned;
 
   private final int tabletRowLimit;
 
@@ -54,12 +55,14 @@ public class InsertTabletPlansIterator {
       QueryPlan queryPlan,
       QueryDataSet queryDataSet,
       PartialPath fromPath,
-      List<PartialPath> intoPaths)
+      List<PartialPath> intoPaths,
+      boolean isIntoPathsAligned)
       throws IllegalPathException {
     this.queryPlan = queryPlan;
     this.queryDataSet = queryDataSet;
     this.fromPath = fromPath;
     this.intoPaths = intoPaths;
+    this.isIntoPathsAligned = isIntoPathsAligned;
 
     tabletRowLimit =
         IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
@@ -95,7 +98,8 @@ public class InsertTabletPlansIterator {
     for (int i = 0, intoPathsSize = intoPaths.size(); i < intoPathsSize; i++) {
       String device = intoPaths.get(i).getDevice();
       if (!deviceToPlanGeneratorMap.containsKey(device)) {
-        deviceToPlanGeneratorMap.put(device, new InsertTabletPlanGenerator(device, tabletRowLimit));
+        deviceToPlanGeneratorMap.put(
+            device, new InsertTabletPlanGenerator(device, tabletRowLimit, isIntoPathsAligned));
       }
       deviceToPlanGeneratorMap
           .get(device)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectIntoOperator.java
index 9ede267..09231c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectIntoOperator.java
@@ -37,6 +37,7 @@ public class SelectIntoOperator extends Operator {
   private QueryOperator queryOperator;
 
   private List<PartialPath> intoPaths;
+  private boolean isIntoPathsAligned;
 
   public SelectIntoOperator() {
     super(SQLConstant.TOK_SELECT_INTO);
@@ -52,7 +53,10 @@ public class SelectIntoOperator extends Operator {
           "select into: the number of source paths and the number of target paths should be the same.");
     }
     return new SelectIntoPlan(
-        queryPlan, queryOperator.getFromComponent().getPrefixPaths().get(0), intoPaths);
+        queryPlan,
+        queryOperator.getFromComponent().getPrefixPaths().get(0),
+        intoPaths,
+        isIntoPathsAligned);
   }
 
   public void check() throws LogicalOperatorException {
@@ -107,4 +111,8 @@ public class SelectIntoOperator extends Operator {
   public void setIntoPaths(List<PartialPath> intoPaths) {
     this.intoPaths = intoPaths;
   }
+
+  public void setIntoPathsAligned(boolean isIntoPathsAligned) {
+    this.isIntoPathsAligned = isIntoPathsAligned;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
index 2221e6c..1832711 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
@@ -36,16 +36,22 @@ public class SelectIntoPlan extends PhysicalPlan {
   private QueryPlan queryPlan;
   private PartialPath fromPath;
   private List<PartialPath> intoPaths;
+  private boolean isIntoPathsAligned;
 
   public SelectIntoPlan() {
     super(OperatorType.SELECT_INTO);
   }
 
-  public SelectIntoPlan(QueryPlan queryPlan, PartialPath fromPath, List<PartialPath> intoPaths) {
+  public SelectIntoPlan(
+      QueryPlan queryPlan,
+      PartialPath fromPath,
+      List<PartialPath> intoPaths,
+      boolean isIntoPathsAligned) {
     super(OperatorType.SELECT_INTO);
     this.queryPlan = queryPlan;
     this.fromPath = fromPath;
     this.intoPaths = intoPaths;
+    this.isIntoPathsAligned = isIntoPathsAligned;
   }
 
   @Override
@@ -65,6 +71,8 @@ public class SelectIntoPlan extends PhysicalPlan {
     for (PartialPath intoPath : intoPaths) {
       putString(outputStream, intoPath.getFullPath());
     }
+
+    outputStream.writeByte(isIntoPathsAligned ? 1 : 0);
   }
 
   @Override
@@ -79,6 +87,8 @@ public class SelectIntoPlan extends PhysicalPlan {
     for (PartialPath intoPath : intoPaths) {
       putString(buffer, intoPath.getFullPath());
     }
+
+    buffer.put((byte) (isIntoPathsAligned ? 1 : 0));
   }
 
   @Override
@@ -92,6 +102,8 @@ public class SelectIntoPlan extends PhysicalPlan {
     for (int i = 0; i < intoPathsSize; ++i) {
       intoPaths.add(new PartialPath(readString(buffer)));
     }
+
+    isIntoPathsAligned = buffer.get() == (byte) 1;
   }
 
   /** mainly for query auth. */
@@ -111,4 +123,8 @@ public class SelectIntoPlan extends PhysicalPlan {
   public List<PartialPath> getIntoPaths() {
     return intoPaths;
   }
+
+  public boolean isIntoPathsAligned() {
+    return isIntoPathsAligned;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 43137e5..533ae4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -1039,6 +1039,7 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
       intoPaths.add(parseIntoPath(ctx.intoClause().intoPath(i)));
     }
     selectIntoOperator.setIntoPaths(intoPaths);
+    selectIntoOperator.setIntoPathsAligned(ctx.intoClause().ALIGNED() != null);
     return selectIntoOperator;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 1ea2833..2467f1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -841,18 +841,18 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
     queryFrequencyRecorder.incrementAndGet();
     AUDIT_LOGGER.debug(
         "Session {} execute select into: {}", sessionManager.getCurrSessionId(), statement);
-    if (physicalPlan instanceof QueryPlan && ((QueryPlan) physicalPlan).isEnableTracing()) {
+    if (queryPlan.isEnableTracing()) {
       tracingManager.setSeriesPathNum(queryId, queryPlan.getPaths().size());
     }
 
     try {
-
       InsertTabletPlansIterator insertTabletPlansIterator =
           new InsertTabletPlansIterator(
               queryPlan,
               createQueryDataSet(context, queryPlan, fetchSize),
               selectIntoPlan.getFromPath(),
-              selectIntoPlan.getIntoPaths());
+              selectIntoPlan.getIntoPaths(),
+              selectIntoPlan.isIntoPathsAligned());
       while (insertTabletPlansIterator.hasNext()) {
         TSStatus executionStatus =
             insertTabletsInternally(insertTabletPlansIterator.next(), sessionId);