You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/15 08:53:55 UTC

[iotdb] 01/01: [IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2375
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1b0b7574948b8c3960d49bb842594cbacbb77e84
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sat Jan 15 16:52:54 2022 +0800

    [IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set
---
 .../db/integration/IoTDBContinuousQueryIT.java     | 191 ++++++++++++++++++---
 .../cq/ContinuousQuerySchemaCheckTask.java         |   9 +-
 .../db/{ => engine}/cq/ContinuousQueryService.java |   2 +-
 .../db/{ => engine}/cq/ContinuousQueryTask.java    | 155 ++++-------------
 .../cq/ContinuousQueryTaskPoolManager.java         |   2 +-
 .../selectinto/InsertTabletPlansIterator.java      |  51 +++++-
 .../org/apache/iotdb/db/metadata/MManager.java     |   2 +-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   9 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +-
 .../iotdb/db/query/expression/ResultColumn.java    |   5 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   2 +-
 12 files changed, 275 insertions(+), 157 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
index 8a875db..9dcc99e 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.integration;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 import org.apache.iotdb.jdbc.Config;
@@ -53,6 +55,8 @@ public class IoTDBContinuousQueryIT {
   private Connection connection;
   private volatile Exception exception = null;
 
+  private PartialPath[] partialPathArray;
+
   private final Thread dataGenerator =
       new Thread() {
         @Override
@@ -62,11 +66,13 @@ public class IoTDBContinuousQueryIT {
                       Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
               Statement statement = connection.createStatement()) {
             do {
-              for (String timeSeries : timeSeriesArray) {
+              for (PartialPath partialPath : partialPathArray) {
                 statement.execute(
                     String.format(
-                        "insert into %s(timestamp, temperature) values(now(), %.3f)",
-                        timeSeries, 200 * Math.random()));
+                        "insert into %s(timestamp, %s) values(now(), %.3f)",
+                        partialPath.getDevicePath(),
+                        partialPath.getMeasurement(),
+                        200 * Math.random()));
               }
             } while (!isInterrupted());
           } catch (Exception e) {
@@ -84,22 +90,23 @@ public class IoTDBContinuousQueryIT {
     dataGenerator.join();
   }
 
-  private final String[] timeSeriesArray = {
-    "root.ln.wf01.wt01.ws01",
-    "root.ln.wf01.wt01.ws02",
-    "root.ln.wf01.wt02.ws01",
-    "root.ln.wf01.wt02.ws02",
-    "root.ln.wf02.wt01.ws01",
-    "root.ln.wf02.wt01.ws02",
-    "root.ln.wf02.wt02.ws01",
-    "root.ln.wf02.wt02.ws02"
-  };
-
-  private void createTimeSeries() throws SQLException {
-    for (String timeSeries : timeSeriesArray) {
+  private void createTimeSeries(String[] timeSeriesArray) throws SQLException {
+    initPartialPaths(timeSeriesArray);
+    for (PartialPath partialPath : partialPathArray) {
       statement.execute(
           String.format(
-              "create timeseries %s.temperature with datatype=FLOAT,encoding=RLE", timeSeries));
+              "create timeseries %s with datatype=FLOAT,encoding=RLE", partialPath.getFullPath()));
+    }
+  }
+
+  private void initPartialPaths(String[] timeSeriesArray) {
+    partialPathArray = new PartialPath[timeSeriesArray.length];
+    for (int i = 0; i < timeSeriesArray.length; ++i) {
+      try {
+        partialPathArray[i] = new PartialPath(timeSeriesArray[i]);
+      } catch (IllegalPathException e) {
+        fail(e.getMessage());
+      }
     }
   }
 
@@ -120,7 +127,17 @@ public class IoTDBContinuousQueryIT {
 
   @Test
   public void testCreateAndDropContinuousQuery() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
 
     statement.execute(
         "CREATE CONTINUOUS QUERY cq1 "
@@ -182,8 +199,18 @@ public class IoTDBContinuousQueryIT {
   }
 
   @Test
-  public void testContinuousQueryResultSeries() throws Exception {
-    createTimeSeries();
+  public void testContinuousQueryResultSeriesWithLevels() throws Exception {
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     Thread.sleep(500);
@@ -215,8 +242,95 @@ public class IoTDBContinuousQueryIT {
   }
 
   @Test
+  public void testContinuousQueryResultSeriesWithDuplicatedTargetPaths() throws Exception {
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.ws02.temperature",
+          "root.ln.wf01.ws01.temperature",
+          "root.ln.wf02.wt01.temperature",
+          "root.ln.wf02.wt02.temperature",
+        });
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    try {
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq1 "
+              + "BEGIN SELECT avg(temperature) INTO root.target.{2}.{3}.avg FROM root.ln.*.* "
+              + "GROUP BY time(1s) END");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("duplicated"));
+    }
+  }
+
+  @Test
+  public void testContinuousQueryResultSeriesWithoutLevels1() throws Exception {
+    String[] timeSeriesArray = new String[30];
+    int wsIndex = 1;
+    for (int i = 1; i <= 30; ++i) {
+      timeSeriesArray[i - 1] =
+          "root.ln.wf0" + (i < 15 ? 1 : 2) + ".ws" + wsIndex++ + ".temperature";
+    }
+    createTimeSeries(timeSeriesArray);
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT avg(temperature) INTO root.target.${2}.${3}_avg FROM root.ln.*.* "
+            + "GROUP BY time(1s) END");
+
+    Thread.sleep(5500);
+
+    checkShowTimeSeriesCount(2 * timeSeriesArray.length);
+
+    statement.execute("DROP CONTINUOUS QUERY cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
+  public void testContinuousQueryResultSeriesWithoutLevels2() throws Exception {
+    String[] timeSeriesArray = new String[30];
+    int wsIndex = 1;
+    for (int i = 1; i <= 30; ++i) {
+      timeSeriesArray[i - 1] =
+          "root.ln.wf0" + (i < 15 ? 1 : 2) + ".ws" + wsIndex++ + ".temperature";
+    }
+    createTimeSeries(timeSeriesArray);
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT avg(temperature) INTO root.target.${2}.${3}.avg FROM root.ln.*.* "
+            + "GROUP BY time(1s) END");
+
+    Thread.sleep(5500);
+
+    checkShowTimeSeriesCount(2 * timeSeriesArray.length);
+
+    statement.execute("DROP CONTINUOUS QUERY cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
   public void testInterval1000() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     statement.execute(
@@ -232,7 +346,17 @@ public class IoTDBContinuousQueryIT {
 
   @Test
   public void testInterval2000() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     statement.execute(
@@ -248,7 +372,17 @@ public class IoTDBContinuousQueryIT {
 
   @Test
   public void testInterval3000() throws Exception {
-    createTimeSeries();
+    createTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature"
+        });
     startDataGenerator();
 
     statement.execute(
@@ -345,4 +479,15 @@ public class IoTDBContinuousQueryIT {
       Assert.assertTrue(collect.contains(s));
     }
   }
+
+  private void checkShowTimeSeriesCount(int expected) throws SQLException {
+    Assert.assertTrue(statement.execute("show timeseries"));
+    int autual = 0;
+    try (ResultSet resultSet = statement.getResultSet()) {
+      while (resultSet.next()) {
+        ++autual;
+      }
+    }
+    Assert.assertEquals(expected, autual);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
index 73c0481..0f5dbbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
@@ -17,14 +17,16 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.service.IoTDB;
@@ -47,8 +49,9 @@ public class ContinuousQuerySchemaCheckTask extends ContinuousQueryTask {
 
   /** we only do some checks here. we don't write any data. */
   @Override
-  protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
-      throws MetadataException, IOException, ContinuousQueryException {
+  protected void doInsert(
+      String sql, QueryOperator queryOperator, GroupByTimePlan queryPlan, QueryDataSet queryDataSet)
+      throws MetadataException, StorageEngineException, IOException {
     Set<PartialPath> targetPaths = new HashSet<>(generateTargetPaths(queryDataSet.getPaths()));
     checkTargetPathNumber(queryDataSet, targetPaths);
     checkTargetPathDataType(queryPlan, targetPaths);
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
index 58e11f2..0b44b8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
index f05c977..23bb596 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
@@ -16,28 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.apache.thrift.TException;
@@ -46,8 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -77,11 +78,18 @@ public class ContinuousQueryTask extends WrappedRunnable {
   public void runMayThrow()
       throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
           QueryFilterOptimizationException, MetadataException, TException, SQLException {
+    // construct logical operator
     final String sql = generateSQL();
+    Operator operator = LogicalGenerator.generate(sql, ZoneId.systemDefault());
+    if (!operator.isQuery()) {
+      throw new ContinuousQueryException(
+          String.format("unsupported operation in cq task: %s", operator.getType().name()));
+    }
+    QueryOperator queryOperator = (QueryOperator) operator;
 
     // construct query plan
     final GroupByTimePlan queryPlan =
-        (GroupByTimePlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(sql);
+        (GroupByTimePlan) serviceProvider.getPlanner().operatorToPhysicalPlan(queryOperator);
     if (queryPlan.getDeduplicatedPaths().isEmpty()) {
       if (continuousQueryPlan.isDebug()) {
         LOGGER.info(continuousQueryPlan.getContinuousQueryName() + ": deduplicated paths empty.");
@@ -109,7 +117,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
       }
 
       // insert data into target timeseries
-      doInsert(queryDataSet, queryPlan);
+      doInsert(sql, queryOperator, queryPlan, queryDataSet);
     } finally {
       ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
     }
@@ -128,118 +136,23 @@ public class ContinuousQueryTask extends WrappedRunnable {
         + continuousQueryPlan.getQuerySqlAfterGroupByClause();
   }
 
-  protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
-      throws IOException, MetadataException, QueryProcessException, StorageEngineException {
-    int columnSize = queryDataSet.getDataTypes().size();
-    TSDataType dataType =
-        TypeInferenceUtils.getAggrDataType(
-            queryPlan.getAggregations().get(0), queryPlan.getDataTypes().get(0));
-    InsertTabletPlan[] insertTabletPlans =
-        generateInsertTabletPlans(columnSize, queryDataSet, dataType);
-
-    int batchSize =
-        (int)
-            Math.min(
-                EXECUTION_BATCH_SIZE,
-                Math.ceil(
-                    (float) continuousQueryPlan.getForInterval()
-                        / (continuousQueryPlan.getGroupByTimeInterval())));
-    Object[][] columns = constructColumns(columnSize, batchSize, dataType);
-    long[][] timestamps = new long[columnSize][batchSize];
-    int[] rowNums = new int[columnSize];
-
-    boolean hasNext = true;
-    while (hasNext) {
-      int rowNum = 0;
-
-      while (++rowNum <= batchSize) {
-        if (!queryDataSet.hasNext()) {
-          hasNext = false;
-          break;
-        }
-        fillColumns(columns, dataType, queryDataSet.next(), rowNums, timestamps);
-      }
-
-      for (int i = 0; i < columnSize; i++) {
-        if (rowNums[i] > 0) {
-          insertTabletPlans[i].setTimes(timestamps[i]);
-          insertTabletPlans[i].setColumns(columns[i]);
-          insertTabletPlans[i].setRowCount(rowNums[i]);
-          serviceProvider.executeNonQuery(insertTabletPlans[i]);
-        }
-      }
-    }
-  }
-
-  protected InsertTabletPlan[] generateInsertTabletPlans(
-      int columnSize, QueryDataSet result, TSDataType dataType) throws IllegalPathException {
-    List<PartialPath> targetPaths = generateTargetPaths(result.getPaths());
-    InsertTabletPlan[] insertTabletPlans = new InsertTabletPlan[columnSize];
-    String[] measurements = new String[] {targetPaths.get(0).getMeasurement()};
-    List<Integer> dataTypes = Collections.singletonList(dataType.ordinal());
-
-    for (int i = 0; i < columnSize; i++) {
-      insertTabletPlans[i] =
-          new InsertTabletPlan(
-              new PartialPath(targetPaths.get(i).getDevice()), measurements, dataTypes);
-    }
-
-    return insertTabletPlans;
-  }
-
-  protected Object[][] constructColumns(int columnSize, int fetchSize, TSDataType dataType) {
-    Object[][] columns = new Object[columnSize][1];
-    for (int i = 0; i < columnSize; i++) {
-      switch (dataType) {
-        case DOUBLE:
-          columns[i][0] = new double[fetchSize];
-          break;
-        case INT64:
-          columns[i][0] = new long[fetchSize];
-          break;
-        case INT32:
-          columns[i][0] = new int[fetchSize];
-          break;
-        case FLOAT:
-          columns[i][0] = new float[fetchSize];
-          break;
-        default:
-          break;
-      }
-    }
-    return columns;
-  }
-
-  protected void fillColumns(
-      Object[][] columns,
-      TSDataType dataType,
-      RowRecord record,
-      int[] rowNums,
-      long[][] timestamps) {
-    List<Field> fields = record.getFields();
-    long ts = record.getTimestamp();
-
-    for (int i = 0; i < columns.length; i++) {
-      Field field = fields.get(i);
-      if (field != null) {
-        timestamps[i][rowNums[i]] = ts;
-        switch (dataType) {
-          case DOUBLE:
-            ((double[]) columns[i][0])[rowNums[i]] = field.getDoubleV();
-            break;
-          case INT64:
-            ((long[]) columns[i][0])[rowNums[i]] = field.getLongV();
-            break;
-          case INT32:
-            ((int[]) columns[i][0])[rowNums[i]] = field.getIntV();
-            break;
-          case FLOAT:
-            ((float[]) columns[i][0])[rowNums[i]] = field.getFloatV();
-            break;
-          default:
-        }
-
-        rowNums[i]++;
+  protected void doInsert(
+      String sql, QueryOperator queryOperator, GroupByTimePlan queryPlan, QueryDataSet queryDataSet)
+      throws MetadataException, QueryProcessException, StorageEngineException, IOException {
+    InsertTabletPlansIterator insertTabletPlansIterator =
+        new InsertTabletPlansIterator(
+            queryPlan,
+            queryDataSet,
+            queryOperator.getFromComponent().getPrefixPaths().get(0),
+            generateTargetPaths(queryDataSet.getPaths()),
+            false);
+    while (insertTabletPlansIterator.hasNext()) {
+      if (!serviceProvider.executeNonQuery(
+          new InsertMultiTabletPlan(insertTabletPlansIterator.next()))) {
+        throw new ContinuousQueryException(
+            String.format(
+                "failed to execute cq task %s, sql: %s",
+                continuousQueryPlan.getContinuousQueryName(), sql));
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
index 5b112d7..000c713 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
 
 import org.apache.iotdb.db.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
index a7b0738..a83c23b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
@@ -25,9 +25,13 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,7 +42,9 @@ import java.util.regex.Pattern;
 
 public class InsertTabletPlansIterator {
 
-  private static final Pattern leveledPathNodePattern = Pattern.compile("\\$\\{\\w+}");
+  private static final Logger LOGGER = LoggerFactory.getLogger(InsertTabletPlansIterator.class);
+
+  private static final Pattern LEVELED_PATH_NODE_PATTERN = Pattern.compile("\\$\\{\\w+}");
 
   private final QueryPlan queryPlan;
   private final QueryDataSet queryDataSet;
@@ -80,7 +86,7 @@ public class InsertTabletPlansIterator {
   private PartialPath generateActualIntoPath(int index) throws IllegalPathException {
     String[] nodes = fromPath.getNodes();
     StringBuffer sb = new StringBuffer();
-    Matcher m = leveledPathNodePattern.matcher(intoPaths.get(index).getFullPath());
+    Matcher m = LEVELED_PATH_NODE_PATTERN.matcher(intoPaths.get(index).getFullPath());
     while (m.find()) {
       String param = m.group();
       String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
@@ -92,7 +98,7 @@ public class InsertTabletPlansIterator {
 
   private void constructInsertTabletPlanGenerators() {
     final Map<String, Integer> sourcePathToQueryDataSetIndex = queryPlan.getPathToIndex();
-    final List<ResultColumn> resultColumns = queryPlan.getResultColumns();
+    final List<String> sourcePaths = findSourcePaths();
 
     Map<String, InsertTabletPlanGenerator> deviceToPlanGeneratorMap = new HashMap<>();
     for (int i = 0, intoPathsSize = intoPaths.size(); i < intoPathsSize; i++) {
@@ -105,13 +111,50 @@ public class InsertTabletPlansIterator {
           .get(device)
           .collectTargetPathInformation(
               intoPaths.get(i).getMeasurement(),
-              sourcePathToQueryDataSetIndex.get(resultColumns.get(i).getResultColumnName()));
+              sourcePathToQueryDataSetIndex.get(sourcePaths.get(i)));
     }
 
     insertTabletPlanGenerators =
         deviceToPlanGeneratorMap.values().toArray(new InsertTabletPlanGenerator[0]);
   }
 
+  private List<String> findSourcePaths() {
+    // sourcePaths can be in queryPlanColumns or in queryDataSetPaths
+    final List<ResultColumn> queryPlanColumns = queryPlan.getResultColumns();
+    final List<Path> queryDataSetPaths = queryDataSet.getPaths();
+
+    final Map<String, Integer> sourcePathToQueryDataSetIndex = queryPlan.getPathToIndex();
+    final List<String> sourcePaths =
+        new ArrayList<>(Math.max(queryPlanColumns.size(), queryDataSetPaths.size()));
+
+    if (queryPlanColumns.size() == intoPaths.size()) {
+      for (ResultColumn resultColumn : queryPlanColumns) {
+        String path = resultColumn.getResultColumnName();
+        if (!sourcePathToQueryDataSetIndex.containsKey(path)) {
+          sourcePaths.clear();
+          break;
+        }
+        sourcePaths.add(path);
+      }
+    }
+
+    if (sourcePaths.isEmpty() && queryDataSetPaths.size() == intoPaths.size()) {
+      for (Path path : queryDataSetPaths) {
+        if (!sourcePathToQueryDataSetIndex.containsKey(path.getFullPath())) {
+          sourcePaths.clear();
+          break;
+        }
+        sourcePaths.add(path.getFullPath());
+      }
+    }
+
+    if (sourcePaths.isEmpty()) {
+      LOGGER.warn("select into: sourcePaths.isEmpty()");
+    }
+
+    return sourcePaths;
+  }
+
   public boolean hasNext() throws IOException {
     return queryDataSet.hasNext();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index f6eb834..6ec6eee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.metadata;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 61c1e40..02258df 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -141,4 +141,13 @@ public class Planner {
   public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr) throws QueryProcessException {
     return parseSQLToPhysicalPlan(sqlStr, ZoneId.systemDefault());
   }
+
+  public PhysicalPlan operatorToPhysicalPlan(Operator operator) throws QueryProcessException {
+    // check if there are logical errors
+    LogicalChecker.check(operator);
+    // optimize the logical operator
+    operator = logicalOptimize(operator);
+    // from logical operator to physical plan
+    return new PhysicalGenerator().transformToPhysicalPlan(operator);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 0966e6b..3e05f50 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -29,12 +29,12 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
 import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
index 8be34eb..51195d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
@@ -154,6 +154,11 @@ public class ResultColumn {
   }
 
   @Override
+  public String toString() {
+    return "ResultColumn{" + "expression=" + expression + ", alias='" + alias + '\'' + '}';
+  }
+
+  @Override
   public final int hashCode() {
     return alias == null ? getResultColumnName().hashCode() : alias.hashCode();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 876272f..7e90950 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceCheck;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.ConfigurationException;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 2db631c..5256df3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -24,12 +24,12 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.constant.TestConstant;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.BloomFilterCache;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
 import org.apache.iotdb.db.exception.StorageEngineException;