You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/15 08:53:54 UTC

[iotdb] branch iotdb-2375 created (now 1b0b757)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch iotdb-2375
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 1b0b757  [IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set

This branch includes the following new commits:

     new 1b0b757  [IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: [IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2375
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1b0b7574948b8c3960d49bb842594cbacbb77e84
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sat Jan 15 16:52:54 2022 +0800

    [IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set
---
 .../db/integration/IoTDBContinuousQueryIT.java     | 191 ++++++++++++++++++---
 .../cq/ContinuousQuerySchemaCheckTask.java         |   9 +-
 .../db/{ => engine}/cq/ContinuousQueryService.java |   2 +-
 .../db/{ => engine}/cq/ContinuousQueryTask.java    | 155 ++++-------------
 .../cq/ContinuousQueryTaskPoolManager.java         |   2 +-
 .../selectinto/InsertTabletPlansIterator.java      |  51 +++++-
 .../org/apache/iotdb/db/metadata/MManager.java     |   2 +-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   9 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +-
 .../iotdb/db/query/expression/ResultColumn.java    |   5 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   2 +-
 12 files changed, 275 insertions(+), 157 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
index 8a875db..9dcc99e 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.integration;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 import org.apache.iotdb.jdbc.Config;
@@ -53,6 +55,8 @@ public class IoTDBContinuousQueryIT {
   private Connection connection;
   private volatile Exception exception = null;
 
+  private PartialPath[] partialPathArray;
+
   private final Thread dataGenerator =
       new Thread() {
         @Override
@@ -62,11 +66,13 @@ public class IoTDBContinuousQueryIT {
                       Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
               Statement statement = connection.createStatement()) {
             do {
-              for (String timeSeries : timeSeriesArray) {
+              for (PartialPath partialPath : partialPathArray) {
                 statement.execute(
                     String.format(
-                        "insert into %s(timestamp, temperature) values(now(), %.3f)",
-                        timeSeries, 200 * Math.random()));
+                        "insert into %s(timestamp, %s) values(now(), %.3f)",
+                        partialPath.getDevicePath(),
+                        partialPath.getMeasurement(),
+                        200 * Math.random()));
               }
             } while (!isInterrupted());
           } catch (Exception e) {
@@ -84,22 +90,23 @@ public class IoTDBContinuousQueryIT {
     dataGenerator.join();
   }
 
-  private final String[] timeSeriesArray = {
-    "root.ln.wf01.wt01.ws01",
-    "root.ln.wf01.wt01.ws02",
-    "root.ln.wf01.wt02.ws01",
-    "root.ln.wf01.wt02.ws02",
-    "root.ln.wf02.wt01.ws01",
-    "root.ln.wf02.wt01.ws02",
-    "root.ln.wf02.wt02.ws01",
-    "root.ln.wf02.wt02.ws02"
-  };
-
-  private void createTimeSeries() throws SQLException {
-    for (String timeSeries : timeSeriesArray) {
+  private void createTimeSeries(String[] timeSeriesArray) throws SQLException {
+    initPartialPaths(timeSeriesArray);
+    for (PartialPath partialPath : partialPathArray) {
       statement.execute(
           String.format(
-              "create timeseries %s.temperature with datatype=FLOAT,encoding=RLE", timeSeries));
+              "create timeseries %s with datatype=FLOAT,encoding=RLE", partialPath.getFullPath()));
+    }
+  }
+
+  private void initPartialPaths(String[] timeSeriesArray) {
+    partialPathArray = new PartialPath[timeSeriesArray.length];
+    for (int i = 0; i < timeSeriesArray.length; ++i) {
+      try {
+        partialPathArray[i] = new PartialPath(timeSeriesArray[i]);
+      } catch (IllegalPathException e) {
+        fail(e.getMessage());
+      }
     }
   }
 
@@ -120,7 +127,17 @@ public class IoTDBContinuousQueryIT {
 
   @Test
   public void testCreateAndDropContinuousQuery() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
 
     statement.execute(
         "CREATE CONTINUOUS QUERY cq1 "
@@ -182,8 +199,18 @@ public class IoTDBContinuousQueryIT {
   }
 
   @Test
-  public void testContinuousQueryResultSeries() throws Exception {
-    createTimeSeries();
+  public void testContinuousQueryResultSeriesWithLevels() throws Exception {
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     Thread.sleep(500);
@@ -215,8 +242,95 @@ public class IoTDBContinuousQueryIT {
   }
 
   @Test
+  public void testContinuousQueryResultSeriesWithDuplicatedTargetPaths() throws Exception {
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.ws02.temperature",
+          "root.ln.wf01.ws01.temperature",
+          "root.ln.wf02.wt01.temperature",
+          "root.ln.wf02.wt02.temperature",
+        });
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    try {
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq1 "
+              + "BEGIN SELECT avg(temperature) INTO root.target.{2}.{3}.avg FROM root.ln.*.* "
+              + "GROUP BY time(1s) END");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("duplicated"));
+    }
+  }
+
+  @Test
+  public void testContinuousQueryResultSeriesWithoutLevels1() throws Exception {
+    String[] timeSeriesArray = new String[30];
+    int wsIndex = 1;
+    for (int i = 1; i <= 30; ++i) {
+      timeSeriesArray[i - 1] =
+          "root.ln.wf0" + (i < 15 ? 1 : 2) + ".ws" + wsIndex++ + ".temperature";
+    }
+    createTimeSeries(timeSeriesArray);
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT avg(temperature) INTO root.target.${2}.${3}_avg FROM root.ln.*.* "
+            + "GROUP BY time(1s) END");
+
+    Thread.sleep(5500);
+
+    checkShowTimeSeriesCount(2 * timeSeriesArray.length);
+
+    statement.execute("DROP CONTINUOUS QUERY cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
+  public void testContinuousQueryResultSeriesWithoutLevels2() throws Exception {
+    String[] timeSeriesArray = new String[30];
+    int wsIndex = 1;
+    for (int i = 1; i <= 30; ++i) {
+      timeSeriesArray[i - 1] =
+          "root.ln.wf0" + (i < 15 ? 1 : 2) + ".ws" + wsIndex++ + ".temperature";
+    }
+    createTimeSeries(timeSeriesArray);
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT avg(temperature) INTO root.target.${2}.${3}.avg FROM root.ln.*.* "
+            + "GROUP BY time(1s) END");
+
+    Thread.sleep(5500);
+
+    checkShowTimeSeriesCount(2 * timeSeriesArray.length);
+
+    statement.execute("DROP CONTINUOUS QUERY cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
   public void testInterval1000() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     statement.execute(
@@ -232,7 +346,17 @@ public class IoTDBContinuousQueryIT {
 
   @Test
   public void testInterval2000() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     statement.execute(
@@ -248,7 +372,17 @@ public class IoTDBContinuousQueryIT {
 
   @Test
   public void testInterval3000() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     statement.execute(
@@ -345,4 +479,15 @@ public class IoTDBContinuousQueryIT {
       Assert.assertTrue(collect.contains(s));
     }
   }
+
+  private void checkShowTimeSeriesCount(int expected) throws SQLException {
+    Assert.assertTrue(statement.execute("show timeseries"));
+    int autual = 0;
+    try (ResultSet resultSet = statement.getResultSet()) {
+      while (resultSet.next()) {
+        ++autual;
+      }
+    }
+    Assert.assertEquals(expected, autual);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
index 73c0481..0f5dbbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
@@ -17,14 +17,16 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.service.IoTDB;
@@ -47,8 +49,9 @@ public class ContinuousQuerySchemaCheckTask extends ContinuousQueryTask {
 
   /** we only do some checks here. we don't write any data. */
   @Override
-  protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
-      throws MetadataException, IOException, ContinuousQueryException {
+  protected void doInsert(
+      String sql, QueryOperator queryOperator, GroupByTimePlan queryPlan, QueryDataSet queryDataSet)
+      throws MetadataException, StorageEngineException, IOException {
     Set<PartialPath> targetPaths = new HashSet<>(generateTargetPaths(queryDataSet.getPaths()));
     checkTargetPathNumber(queryDataSet, targetPaths);
     checkTargetPathDataType(queryPlan, targetPaths);
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
index 58e11f2..0b44b8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
index f05c977..23bb596 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
@@ -16,28 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.apache.thrift.TException;
@@ -46,8 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -77,11 +78,18 @@ public class ContinuousQueryTask extends WrappedRunnable {
   public void runMayThrow()
       throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
           QueryFilterOptimizationException, MetadataException, TException, SQLException {
+    // construct logical operator
     final String sql = generateSQL();
+    Operator operator = LogicalGenerator.generate(sql, ZoneId.systemDefault());
+    if (!operator.isQuery()) {
+      throw new ContinuousQueryException(
+          String.format("unsupported operation in cq task: %s", operator.getType().name()));
+    }
+    QueryOperator queryOperator = (QueryOperator) operator;
 
     // construct query plan
     final GroupByTimePlan queryPlan =
-        (GroupByTimePlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(sql);
+        (GroupByTimePlan) serviceProvider.getPlanner().operatorToPhysicalPlan(queryOperator);
     if (queryPlan.getDeduplicatedPaths().isEmpty()) {
       if (continuousQueryPlan.isDebug()) {
         LOGGER.info(continuousQueryPlan.getContinuousQueryName() + ": deduplicated paths empty.");
@@ -109,7 +117,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
       }
 
       // insert data into target timeseries
-      doInsert(queryDataSet, queryPlan);
+      doInsert(sql, queryOperator, queryPlan, queryDataSet);
     } finally {
       ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
     }
@@ -128,118 +136,23 @@ public class ContinuousQueryTask extends WrappedRunnable {
         + continuousQueryPlan.getQuerySqlAfterGroupByClause();
   }
 
-  protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
-      throws IOException, MetadataException, QueryProcessException, StorageEngineException {
-    int columnSize = queryDataSet.getDataTypes().size();
-    TSDataType dataType =
-        TypeInferenceUtils.getAggrDataType(
-            queryPlan.getAggregations().get(0), queryPlan.getDataTypes().get(0));
-    InsertTabletPlan[] insertTabletPlans =
-        generateInsertTabletPlans(columnSize, queryDataSet, dataType);
-
-    int batchSize =
-        (int)
-            Math.min(
-                EXECUTION_BATCH_SIZE,
-                Math.ceil(
-                    (float) continuousQueryPlan.getForInterval()
-                        / (continuousQueryPlan.getGroupByTimeInterval())));
-    Object[][] columns = constructColumns(columnSize, batchSize, dataType);
-    long[][] timestamps = new long[columnSize][batchSize];
-    int[] rowNums = new int[columnSize];
-
-    boolean hasNext = true;
-    while (hasNext) {
-      int rowNum = 0;
-
-      while (++rowNum <= batchSize) {
-        if (!queryDataSet.hasNext()) {
-          hasNext = false;
-          break;
-        }
-        fillColumns(columns, dataType, queryDataSet.next(), rowNums, timestamps);
-      }
-
-      for (int i = 0; i < columnSize; i++) {
-        if (rowNums[i] > 0) {
-          insertTabletPlans[i].setTimes(timestamps[i]);
-          insertTabletPlans[i].setColumns(columns[i]);
-          insertTabletPlans[i].setRowCount(rowNums[i]);
-          serviceProvider.executeNonQuery(insertTabletPlans[i]);
-        }
-      }
-    }
-  }
-
-  protected InsertTabletPlan[] generateInsertTabletPlans(
-      int columnSize, QueryDataSet result, TSDataType dataType) throws IllegalPathException {
-    List<PartialPath> targetPaths = generateTargetPaths(result.getPaths());
-    InsertTabletPlan[] insertTabletPlans = new InsertTabletPlan[columnSize];
-    String[] measurements = new String[] {targetPaths.get(0).getMeasurement()};
-    List<Integer> dataTypes = Collections.singletonList(dataType.ordinal());
-
-    for (int i = 0; i < columnSize; i++) {
-      insertTabletPlans[i] =
-          new InsertTabletPlan(
-              new PartialPath(targetPaths.get(i).getDevice()), measurements, dataTypes);
-    }
-
-    return insertTabletPlans;
-  }
-
-  protected Object[][] constructColumns(int columnSize, int fetchSize, TSDataType dataType) {
-    Object[][] columns = new Object[columnSize][1];
-    for (int i = 0; i < columnSize; i++) {
-      switch (dataType) {
-        case DOUBLE:
-          columns[i][0] = new double[fetchSize];
-          break;
-        case INT64:
-          columns[i][0] = new long[fetchSize];
-          break;
-        case INT32:
-          columns[i][0] = new int[fetchSize];
-          break;
-        case FLOAT:
-          columns[i][0] = new float[fetchSize];
-          break;
-        default:
-          break;
-      }
-    }
-    return columns;
-  }
-
-  protected void fillColumns(
-      Object[][] columns,
-      TSDataType dataType,
-      RowRecord record,
-      int[] rowNums,
-      long[][] timestamps) {
-    List<Field> fields = record.getFields();
-    long ts = record.getTimestamp();
-
-    for (int i = 0; i < columns.length; i++) {
-      Field field = fields.get(i);
-      if (field != null) {
-        timestamps[i][rowNums[i]] = ts;
-        switch (dataType) {
-          case DOUBLE:
-            ((double[]) columns[i][0])[rowNums[i]] = field.getDoubleV();
-            break;
-          case INT64:
-            ((long[]) columns[i][0])[rowNums[i]] = field.getLongV();
-            break;
-          case INT32:
-            ((int[]) columns[i][0])[rowNums[i]] = field.getIntV();
-            break;
-          case FLOAT:
-            ((float[]) columns[i][0])[rowNums[i]] = field.getFloatV();
-            break;
-          default:
-        }
-
-        rowNums[i]++;
+  protected void doInsert(
+      String sql, QueryOperator queryOperator, GroupByTimePlan queryPlan, QueryDataSet queryDataSet)
+      throws MetadataException, QueryProcessException, StorageEngineException, IOException {
+    InsertTabletPlansIterator insertTabletPlansIterator =
+        new InsertTabletPlansIterator(
+            queryPlan,
+            queryDataSet,
+            queryOperator.getFromComponent().getPrefixPaths().get(0),
+            generateTargetPaths(queryDataSet.getPaths()),
+            false);
+    while (insertTabletPlansIterator.hasNext()) {
+      if (!serviceProvider.executeNonQuery(
+          new InsertMultiTabletPlan(insertTabletPlansIterator.next()))) {
+        throw new ContinuousQueryException(
+            String.format(
+                "failed to execute cq task %s, sql: %s",
+                continuousQueryPlan.getContinuousQueryName(), sql));
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
index 5b112d7..000c713 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
index a7b0738..a83c23b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
@@ -25,9 +25,13 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,7 +42,9 @@ import java.util.regex.Pattern;
 
 public class InsertTabletPlansIterator {
 
-  private static final Pattern leveledPathNodePattern = Pattern.compile("\\$\\{\\w+}");
+  private static final Logger LOGGER = LoggerFactory.getLogger(InsertTabletPlansIterator.class);
+
+  private static final Pattern LEVELED_PATH_NODE_PATTERN = Pattern.compile("\\$\\{\\w+}");
 
   private final QueryPlan queryPlan;
   private final QueryDataSet queryDataSet;
@@ -80,7 +86,7 @@ public class InsertTabletPlansIterator {
   private PartialPath generateActualIntoPath(int index) throws IllegalPathException {
     String[] nodes = fromPath.getNodes();
     StringBuffer sb = new StringBuffer();
-    Matcher m = leveledPathNodePattern.matcher(intoPaths.get(index).getFullPath());
+    Matcher m = LEVELED_PATH_NODE_PATTERN.matcher(intoPaths.get(index).getFullPath());
     while (m.find()) {
       String param = m.group();
       String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
@@ -92,7 +98,7 @@ public class InsertTabletPlansIterator {
 
   private void constructInsertTabletPlanGenerators() {
     final Map<String, Integer> sourcePathToQueryDataSetIndex = queryPlan.getPathToIndex();
-    final List<ResultColumn> resultColumns = queryPlan.getResultColumns();
+    final List<String> sourcePaths = findSourcePaths();
 
     Map<String, InsertTabletPlanGenerator> deviceToPlanGeneratorMap = new HashMap<>();
     for (int i = 0, intoPathsSize = intoPaths.size(); i < intoPathsSize; i++) {
@@ -105,13 +111,50 @@ public class InsertTabletPlansIterator {
           .get(device)
           .collectTargetPathInformation(
               intoPaths.get(i).getMeasurement(),
-              sourcePathToQueryDataSetIndex.get(resultColumns.get(i).getResultColumnName()));
+              sourcePathToQueryDataSetIndex.get(sourcePaths.get(i)));
     }
 
     insertTabletPlanGenerators =
         deviceToPlanGeneratorMap.values().toArray(new InsertTabletPlanGenerator[0]);
   }
 
+  private List<String> findSourcePaths() {
+    // sourcePaths can be in queryPlanColumns or in queryDataSetPaths
+    final List<ResultColumn> queryPlanColumns = queryPlan.getResultColumns();
+    final List<Path> queryDataSetPaths = queryDataSet.getPaths();
+
+    final Map<String, Integer> sourcePathToQueryDataSetIndex = queryPlan.getPathToIndex();
+    final List<String> sourcePaths =
+        new ArrayList<>(Math.max(queryPlanColumns.size(), queryDataSetPaths.size()));
+
+    if (queryPlanColumns.size() == intoPaths.size()) {
+      for (ResultColumn resultColumn : queryPlanColumns) {
+        String path = resultColumn.getResultColumnName();
+        if (!sourcePathToQueryDataSetIndex.containsKey(path)) {
+          sourcePaths.clear();
+          break;
+        }
+        sourcePaths.add(path);
+      }
+    }
+
+    if (sourcePaths.isEmpty() && queryDataSetPaths.size() == intoPaths.size()) {
+      for (Path path : queryDataSetPaths) {
+        if (!sourcePathToQueryDataSetIndex.containsKey(path.getFullPath())) {
+          sourcePaths.clear();
+          break;
+        }
+        sourcePaths.add(path.getFullPath());
+      }
+    }
+
+    if (sourcePaths.isEmpty()) {
+      LOGGER.warn("select into: sourcePaths.isEmpty()");
+    }
+
+    return sourcePaths;
+  }
+
   public boolean hasNext() throws IOException {
     return queryDataSet.hasNext();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index f6eb834..6ec6eee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.metadata;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 61c1e40..02258df 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -141,4 +141,13 @@ public class Planner {
   public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr) throws QueryProcessException {
     return parseSQLToPhysicalPlan(sqlStr, ZoneId.systemDefault());
   }
+
+  public PhysicalPlan operatorToPhysicalPlan(Operator operator) throws QueryProcessException {
+    // check if there are logical errors
+    LogicalChecker.check(operator);
+    // optimize the logical operator
+    operator = logicalOptimize(operator);
+    // from logical operator to physical plan
+    return new PhysicalGenerator().transformToPhysicalPlan(operator);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 0966e6b..3e05f50 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -29,12 +29,12 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
 import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
index 8be34eb..51195d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
@@ -154,6 +154,11 @@ public class ResultColumn {
   }
 
   @Override
+  public String toString() {
+    return "ResultColumn{" + "expression=" + expression + ", alias='" + alias + '\'' + '}';
+  }
+
+  @Override
   public final int hashCode() {
     return alias == null ? getResultColumnName().hashCode() : alias.hashCode();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 876272f..7e90950 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceCheck;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.ConfigurationException;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 2db631c..5256df3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -24,12 +24,12 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.constant.TestConstant;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.BloomFilterCache;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
 import org.apache.iotdb.db.exception.StorageEngineException;