You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:01 UTC

[iotdb] branch lmh/intoOperator updated (c62e8d6b7f -> 94cebb2572)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from c62e8d6b7f Implement analyzer for SELECT INTO statement
     new 2293ea4ca4 finish IntoOperator v1
     add a3749028b6 add IntoComponent
     add 7e14a8429f Merge remote-tracking branch 'origin/master' into lmh/selectInto
     add de41d7c81c finish sql parser for SELECT INTO statement
     add cf7d9cdee4 Merge remote-tracking branch 'origin/master' into lmh/selectInto
     add 55118b8b06 fix sql parser
     add df5d9e4eb6 add semantic check
     add 62b2d495b9 tmp save (analyzer for SELECT INTO)
     add bbeca341d5 Merge branch 'lmh/selectInto' into lmh/mppSelectInto
     add fb63d17c6b tmp save (analyzer for SELECT INTO)
     add fa3290a015 finish analyzer
     add 4904bea0ad refactor analyzer
     add 9113b516a6 add header
     add 66a200c55b fix bugs
     add 20502476aa Merge remote-tracking branch 'origin/master' into lmh/mppSelectInto
     add 15ca69b62b pr self check
     add 144638a5d9 fix CI
     add 9838423df3 Merge remote-tracking branch 'origin/master' into lmh/mppSelectInto
     add 93e707f0f1 change descriptor ALIGN BY DEVICE
     add c5306406e6 unified naming
     add 7ccd902bca Merge branch 'master' of github.com:apache/iotdb into lmh/mppSelectInto
     add 7adf5582a0 refactor IntoPathDescriptor & DeviceViewIntoPathDescriptor
     add ce39446b95 add UT
     add 3e50f56e3e disable GROUP BY TAGS
     new fbc3c41ece Merge branch 'lmh/mppSelectInto' into lmh/intoOperator
     new 9ce4eacfcb add IntoNode & DeviceViewNode
     new 248dacce56 implement LogicalPlan for SELECT INTO statement
     new aeff0ab7a9 add OperatorTreeGenerator for IntoOperator
     new 4646bf1285 fix bugs
     new 96481443c4 add DeviceViewIntoOperator
     new de98a1a31d implement DeviceViewIntoOperator
     new 41f8243a0d OperatorTreeGenerator  visitDeviceViewInto
     new a8d74daf90 finish execute insertMultiTabletsStatement
     new bb548fd9fa fix sessionId
     new 94cebb2572 add session info in FragmentInstance

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/client/DataNodeInternalClient.java    |  98 +++++++
 ...ledException.java => IntoProcessException.java} |   4 +-
 .../fragment/FragmentInstanceContext.java          |  39 ++-
 .../fragment/FragmentInstanceManager.java          |  12 +-
 .../db/mpp/execution/operator/OperatorContext.java |   8 +
 ...IntoOperator.java => AbstractIntoOperator.java} | 180 ++++++++-----
 .../operator/process/DeviceViewIntoOperator.java   | 135 ++++++++++
 .../execution/operator/process/IntoOperator.java   | 288 +++------------------
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   7 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  40 +++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |   7 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 123 +++++++++
 .../planner/distribution/ExchangeNodeAdder.java    |   6 +-
 .../SimpleFragmentParallelPlanner.java             |   2 +
 .../plan/planner/distribution/SourceRewriter.java  |  11 +-
 .../distribution/WriteFragmentParallelPlanner.java |   4 +-
 .../db/mpp/plan/planner/plan/FragmentInstance.java |  28 +-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   4 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  10 +
 .../metedata/read/SchemaQueryOrderByHeatNode.java  |   4 +-
 .../planner/plan/node/process/AggregationNode.java |  23 +-
 .../planner/plan/node/process/DeviceMergeNode.java |  32 +--
 .../{LimitNode.java => DeviceViewIntoNode.java}    |  97 +++----
 .../planner/plan/node/process/DeviceViewNode.java  |  19 +-
 .../planner/plan/node/process/ExchangeNode.java    |  41 +--
 .../plan/planner/plan/node/process/FillNode.java   |  46 +---
 .../plan/node/process/GroupByLevelNode.java        |  29 +--
 .../planner/plan/node/process/GroupByTagNode.java  |  17 +-
 .../node/process/{LimitNode.java => IntoNode.java} |  94 +++----
 .../plan/planner/plan/node/process/LimitNode.java  |  37 +--
 ...tiChildNode.java => MultiChildProcessNode.java} |  23 +-
 .../plan/planner/plan/node/process/OffsetNode.java |  36 +--
 .../planner/plan/node/process/ProjectNode.java     |  40 +--
 ...iChildNode.java => SingleChildProcessNode.java} |  50 +++-
 .../node/process/SlidingWindowAggregationNode.java |  36 +--
 .../plan/planner/plan/node/process/SortNode.java   |  29 +--
 .../planner/plan/node/process/TimeJoinNode.java    |  17 +-
 .../planner/plan/node/process/TransformNode.java   |  29 +--
 .../node/process/last/LastQueryCollectNode.java    |   4 +-
 .../plan/node/process/last/LastQueryMergeNode.java |   4 +-
 .../plan/node/process/last/LastQueryNode.java      |   4 +-
 .../parameter/DeviceViewIntoPathDescriptor.java    | 177 +++++++++++--
 .../planner/plan/parameter/IntoPathDescriptor.java | 132 +++++++++-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   8 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |   3 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     | 169 ++++++++++++
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |   9 +-
 .../plan/scheduler/StandaloneSchedulerTest.java    |  21 +-
 48 files changed, 1361 insertions(+), 875 deletions(-)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
 copy server/src/main/java/org/apache/iotdb/db/exception/{WriteLockFailedException.java => IntoProcessException.java} (88%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/{IntoOperator.java => AbstractIntoOperator.java} (63%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/{LimitNode.java => DeviceViewIntoNode.java} (50%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/{LimitNode.java => IntoNode.java} (55%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/{MultiChildNode.java => MultiChildProcessNode.java} (77%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/{MultiChildNode.java => SingleChildProcessNode.java} (59%)


[iotdb] 11/12: fix sessionId

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bb548fd9fa488a3b0ed0bb2b5a1d0ac3e36fbfb5
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 17:30:32 2022 +0800

    fix sessionId
---
 .../iotdb/db/client/DataNodeInternalClient.java    |  6 +++--
 .../fragment/FragmentInstanceContext.java          | 27 ++++++++++++++++++---
 .../fragment/FragmentInstanceManager.java          | 12 ++++++++--
 .../db/mpp/execution/operator/OperatorContext.java |  8 +++++++
 .../operator/process/AbstractIntoOperator.java     |  6 +++--
 .../SimpleFragmentParallelPlanner.java             |  2 ++
 .../db/mpp/plan/planner/plan/FragmentInstance.java | 28 +++++++++++++++++++---
 .../iotdb/db/query/control/SessionManager.java     |  5 ----
 8 files changed, 77 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 43f7271bd2..e1e5bde993 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.client;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -53,7 +54,7 @@ public class DataNodeInternalClient {
 
   private final long sessionId;
 
-  public DataNodeInternalClient() {
+  public DataNodeInternalClient(String userName, String zoneId) {
     if (config.isClusterMode()) {
       PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
       SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
@@ -61,7 +62,8 @@ public class DataNodeInternalClient {
       PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
       SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
     }
-    sessionId = SESSION_MANAGER.requestInternalSessionId();
+    this.sessionId =
+        SESSION_MANAGER.requestSessionId(userName, zoneId, IoTDBConstant.ClientVersion.V_0_13);
   }
 
   public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 41a23f8d00..5976897529 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -58,6 +58,10 @@ public class FragmentInstanceContext extends QueryContext {
   private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
   private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
 
+  // session info
+  private String userName;
+  private String zoneId;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -67,8 +71,12 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
-    FragmentInstanceContext instanceContext = new FragmentInstanceContext(id, stateMachine);
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      String userName,
+      String zoneId) {
+    FragmentInstanceContext instanceContext =
+        new FragmentInstanceContext(id, stateMachine, userName, zoneId);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -79,10 +87,15 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   private FragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      String userName,
+      String zoneId) {
     this.id = id;
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
+    this.userName = userName;
+    this.zoneId = zoneId;
   }
 
   // used for compaction
@@ -200,4 +213,12 @@ public class FragmentInstanceContext extends QueryContext {
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public String getZoneId() {
+    return zoneId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 16bf91ba36..921856babc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -108,7 +108,11 @@ public class FragmentInstanceManager {
                     instanceContext.computeIfAbsent(
                         instanceId,
                         fragmentInstanceId ->
-                            createFragmentInstanceContext(fragmentInstanceId, stateMachine));
+                            createFragmentInstanceContext(
+                                fragmentInstanceId,
+                                stateMachine,
+                                instance.getUserName(),
+                                instance.getZoneId()));
 
                 try {
                   DataDriver driver =
@@ -151,7 +155,11 @@ public class FragmentInstanceManager {
                   instanceContext.computeIfAbsent(
                       instanceId,
                       fragmentInstanceId ->
-                          createFragmentInstanceContext(fragmentInstanceId, stateMachine));
+                          createFragmentInstanceContext(
+                              fragmentInstanceId,
+                              stateMachine,
+                              instance.getUserName(),
+                              instance.getZoneId()));
 
               try {
                 SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index 356e0c4eed..02c5311901 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -70,6 +70,14 @@ public class OperatorContext {
     this.maxRunTime = maxRunTime;
   }
 
+  public String getUserName() {
+    return instanceContext.getUserName();
+  }
+
+  public String getZoneId() {
+    return instanceContext.getZoneId();
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 21cf6951fa..4c13359e75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -59,7 +59,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
-  private final DataNodeInternalClient client = new DataNodeInternalClient();
+  private final DataNodeInternalClient client;
 
   public AbstractIntoOperator(
       OperatorContext operatorContext,
@@ -70,6 +70,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     this.child = child;
     this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+    this.client =
+        new DataNodeInternalClient(operatorContext.getUserName(), operatorContext.getZoneId());
   }
 
   protected static List<IntoOperator.InsertTabletStatementGenerator>
@@ -109,7 +111,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       String message =
           String.format(
-              "Error occurred while inserting tablets in SELECT INTO. %s",
+              "Error occurred while inserting tablets in SELECT INTO: %s",
               executionStatus.getMessage());
       LOGGER.error(message);
       throw new IntoProcessException(message);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index f83730b187..b46ac5b36d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -100,6 +100,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
             timeFilter,
             queryContext.getQueryType(),
             queryContext.getTimeOut(),
+            queryContext.getSession().getUserName(),
+            queryContext.getSession().getZoneId(),
             fragment.isRoot());
 
     // Get the target region for origin PlanFragment, then its instance will be distributed one
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 15a25b8dcf..97790ed63e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -65,6 +65,10 @@ public class FragmentInstance implements IConsensusRequest {
 
   private boolean isRoot;
 
+  // session info
+  private final String userName;
+  private final String zoneId;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -73,13 +77,17 @@ public class FragmentInstance implements IConsensusRequest {
       FragmentInstanceId id,
       Filter timeFilter,
       QueryType type,
-      long timeOut) {
+      long timeOut,
+      String userName,
+      String zoneId) {
     this.fragment = fragment;
     this.timeFilter = timeFilter;
     this.id = id;
     this.type = type;
     this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
     this.isRoot = false;
+    this.userName = userName;
+    this.zoneId = zoneId;
   }
 
   public FragmentInstance(
@@ -88,8 +96,10 @@ public class FragmentInstance implements IConsensusRequest {
       Filter timeFilter,
       QueryType type,
       long timeOut,
+      String userName,
+      String zoneId,
       boolean isRoot) {
-    this(fragment, id, timeFilter, type, timeOut);
+    this(fragment, id, timeFilter, type, timeOut, userName, zoneId);
     this.isRoot = isRoot;
   }
 
@@ -188,11 +198,13 @@ public class FragmentInstance implements IConsensusRequest {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
     PlanFragment planFragment = PlanFragment.deserialize(buffer);
     long timeOut = ReadWriteIOUtils.readLong(buffer);
+    String userName = ReadWriteIOUtils.readString(buffer);
+    String zoneId = ReadWriteIOUtils.readString(buffer);
     boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
     Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
     QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
     FragmentInstance fragmentInstance =
-        new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut);
+        new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut, userName, zoneId);
     boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
     fragmentInstance.hostDataNode =
         hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
@@ -205,6 +217,8 @@ public class FragmentInstance implements IConsensusRequest {
       id.serialize(outputStream);
       fragment.serialize(outputStream);
       ReadWriteIOUtils.write(timeOut, outputStream);
+      ReadWriteIOUtils.write(userName, outputStream);
+      ReadWriteIOUtils.write(zoneId, outputStream);
       ReadWriteIOUtils.write(timeFilter != null, outputStream);
       if (timeFilter != null) {
         timeFilter.serialize(outputStream);
@@ -246,4 +260,12 @@ public class FragmentInstance implements IConsensusRequest {
   public long getTimeOut() {
     return timeOut;
   }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public String getZoneId() {
+    return zoneId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 1ee80d5df2..53f140d317 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -237,11 +237,6 @@ public class SessionManager {
     return sessionId;
   }
 
-  public long requestInternalSessionId() {
-    return requestSessionId(
-        "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13);
-  }
-
   public boolean releaseSessionResource(long sessionId) {
     return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
   }


[iotdb] 09/12: OperatorTreeGenerator visitDeviceViewInto

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 41f8243a0d09e66aed2e3b8496f608c0dda2de01
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 14:45:15 2022 +0800

    OperatorTreeGenerator  visitDeviceViewInto
---
 .../operator/process/AbstractIntoOperator.java     |   3 +-
 .../operator/process/DeviceViewIntoOperator.java   |   4 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 106 +++++++++++++++++----
 .../plan/node/process/DeviceViewIntoNode.java      |   4 +
 4 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index d038874d0b..28b90daae7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -82,7 +82,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
   }
 
   protected void insertMultiTabletsInternally(boolean needCheck) {
-    if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
+    if (insertTabletStatementGenerators == null
+        || (needCheck && !insertTabletStatementGenerators.get(0).isFull())
         || insertTabletStatementGenerators.get(0).isEmpty()) {
       return;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index bf1a0920fa..f0eab0b71a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -105,6 +105,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
   }
 
   private void updateResultTsBlock() {
+    if (currentDevice == null) {
+      return;
+    }
+
     TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
     for (Pair<String, PartialPath> sourceTargetPathPair :
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index e91d67b973..8fb6a0387e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewIntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
@@ -132,6 +133,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
@@ -157,6 +159,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -1351,17 +1354,95 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 IntoOperator.class.getSimpleName());
 
     IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor();
-    Map<String, List<InputLocation>> layout = makeLayout(node);
+    Map<String, InputLocation> sourceColumnToInputLocationMap =
+        constructSourceColumnToInputLocationMap(node);
+
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        new HashMap<>();
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+    processTargetPathToSourceMap(
+        intoPathDescriptor.getTargetPathToSourceMap(),
+        targetPathToSourceInputLocationMap,
+        targetPathToDataTypeMap,
+        sourceColumnToInputLocationMap,
+        context.getTypeProvider());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new IntoOperator(
+        operatorContext,
+        child,
+        targetPathToSourceInputLocationMap,
+        targetPathToDataTypeMap,
+        intoPathDescriptor.getTargetDeviceToAlignedMap(),
+        intoPathDescriptor.getSourceTargetPathPairList(),
+        sourceColumnToInputLocationMap);
+  }
+
+  @Override
+  public Operator visitDeviceViewInto(DeviceViewIntoNode node, LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                DeviceViewIntoOperator.class.getSimpleName());
+
+    DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor =
+        node.getDeviceViewIntoPathDescriptor();
+    Map<String, InputLocation> sourceColumnToInputLocationMap =
+        constructSourceColumnToInputLocationMap(node);
+
+    Map<String, Map<PartialPath, Map<String, InputLocation>>>
+        deviceToTargetPathSourceInputLocationMap = new HashMap<>();
+    Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap =
+        new HashMap<>();
+    Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
+        deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+    for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
+        sourceDeviceToTargetPathMap.entrySet()) {
+      String sourceDevice = deviceEntry.getKey();
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+          new HashMap<>();
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+      processTargetPathToSourceMap(
+          deviceEntry.getValue(),
+          targetPathToSourceInputLocationMap,
+          targetPathToDataTypeMap,
+          sourceColumnToInputLocationMap,
+          context.getTypeProvider());
+      deviceToTargetPathSourceInputLocationMap.put(
+          sourceDevice, targetPathToSourceInputLocationMap);
+      deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+    }
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new DeviceViewIntoOperator(
+        operatorContext,
+        child,
+        deviceToTargetPathSourceInputLocationMap,
+        deviceToTargetPathDataTypeMap,
+        deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
+        deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
+        sourceColumnToInputLocationMap);
+  }
+
+  private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
     Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>();
+    Map<String, List<InputLocation>> layout = makeLayout(node);
     for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) {
       sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0));
     }
+    return sourceColumnToInputLocationMap;
+  }
 
-    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
-        new HashMap<>();
-    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
-    Map<PartialPath, Map<String, String>> targetPathToSourceMap =
-        intoPathDescriptor.getTargetPathToSourceMap();
+  private void processTargetPathToSourceMap(
+      Map<PartialPath, Map<String, String>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      TypeProvider typeProvider) {
     for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) {
       PartialPath targetDevice = entry.getKey();
       Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>();
@@ -1371,22 +1452,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         String sourceColumn = measurementEntry.getValue();
         measurementToInputLocationMap.put(
             targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn));
-        measurementToDataTypeMap.put(
-            targetMeasurement, context.getTypeProvider().getType(sourceColumn));
+        measurementToDataTypeMap.put(targetMeasurement, typeProvider.getType(sourceColumn));
       }
       targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap);
       targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
     }
-
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new IntoOperator(
-        operatorContext,
-        child,
-        targetPathToSourceInputLocationMap,
-        targetPathToDataTypeMap,
-        intoPathDescriptor.getTargetDeviceToAlignedMap(),
-        intoPathDescriptor.getSourceTargetPathPairList(),
-        sourceColumnToInputLocationMap);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
index 987cbc18d2..8c5055982c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
@@ -50,6 +50,10 @@ public class DeviceViewIntoNode extends SingleChildProcessNode {
     this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
   }
 
+  public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
+    return deviceViewIntoPathDescriptor;
+  }
+
   @Override
   public PlanNode clone() {
     return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor);


[iotdb] 03/12: add IntoNode & DeviceViewNode

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ce4eacfcbf806a44f93830df73e3f377043efea
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Oct 17 22:20:17 2022 +0800

    add IntoNode & DeviceViewNode
---
 .../planner/distribution/ExchangeNodeAdder.java    |  6 +-
 .../plan/planner/distribution/SourceRewriter.java  | 11 +--
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  4 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java | 10 +++
 .../metedata/read/SchemaQueryOrderByHeatNode.java  |  4 +-
 .../planner/plan/node/process/AggregationNode.java | 23 ++---
 .../planner/plan/node/process/DeviceMergeNode.java | 32 +------
 .../{LimitNode.java => DeviceViewIntoNode.java}    | 97 +++++++++-------------
 .../planner/plan/node/process/DeviceViewNode.java  | 19 +----
 .../planner/plan/node/process/ExchangeNode.java    | 42 +---------
 .../plan/planner/plan/node/process/FillNode.java   | 46 ++--------
 .../plan/node/process/GroupByLevelNode.java        | 29 +++----
 .../planner/plan/node/process/GroupByTagNode.java  | 17 +---
 .../process/{OffsetNode.java => IntoNode.java}     | 91 +++++++++-----------
 .../plan/planner/plan/node/process/LimitNode.java  | 37 ++-------
 ...tiChildNode.java => MultiChildProcessNode.java} | 23 ++++-
 .../plan/planner/plan/node/process/OffsetNode.java | 36 ++------
 .../planner/plan/node/process/ProjectNode.java     | 40 +++------
 ...iChildNode.java => SingleChildProcessNode.java} | 50 ++++++++---
 .../node/process/SlidingWindowAggregationNode.java | 36 ++------
 .../plan/planner/plan/node/process/SortNode.java   | 29 ++-----
 .../planner/plan/node/process/TimeJoinNode.java    | 17 +---
 .../planner/plan/node/process/TransformNode.java   | 29 +------
 .../node/process/last/LastQueryCollectNode.java    |  4 +-
 .../plan/node/process/last/LastQueryMergeNode.java |  4 +-
 .../plan/node/process/last/LastQueryNode.java      |  4 +-
 26 files changed, 237 insertions(+), 503 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 37c487b4ab..60c0a6a9b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -241,8 +241,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processMultiChildNode(node, context);
   }
 
-  private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
-    MultiChildNode newNode = (MultiChildNode) node.clone();
+  private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupContext context) {
+    MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
     List<PlanNode> visitedChildren = new ArrayList<>();
     node.getChildren()
         .forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 8df3a16804..e6bc4891f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
@@ -254,7 +254,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   private PlanNode processRawSeriesScan(
-      SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) {
+      SeriesSourceNode node, DistributionPlanContext context, MultiChildProcessNode parent) {
     List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
     if (sourceNodes.size() == 1) {
       return sourceNodes.get(0);
@@ -407,8 +407,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return processRawMultiChildNode(node, context);
   }
 
-  private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) {
-    MultiChildNode root = (MultiChildNode) node.clone();
+  private PlanNode processRawMultiChildNode(
+      MultiChildProcessNode node, DistributionPlanContext context) {
+    MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
     // Step 1: Get all source nodes. For the node which is not source, add it as the child of
     // current TimeJoinNode
     List<SourceNode> sources = new ArrayList<>();
@@ -468,7 +469,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             } else {
               // We clone a TimeJoinNode from root to make the params to be consistent.
               // But we need to assign a new ID to it
-              MultiChildNode parentOfGroup = (MultiChildNode) root.clone();
+              MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) root.clone();
               parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
               seriesScanNodes.forEach(parentOfGroup::addChild);
               root.addChild(parentOfGroup);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index dd102898d5..8b7804bfd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -140,7 +140,9 @@ public enum PlanNodeType {
   LOAD_TSFILE((short) 55),
   CONSTRUCT_SCHEMA_BLACK_LIST_NODE((short) 56),
   ROLLBACK_SCHEMA_BLACK_LIST_NODE((short) 57),
-  GROUP_BY_TAG((short) 58);
+  GROUP_BY_TAG((short) 58),
+  INTO((short) 59),
+  DEVICE_VIEW_INTO((short) 60);
 
   public static final int BYTES = Short.BYTES;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8b4155cb53..cad20d65de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -44,12 +44,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
@@ -301,4 +303,12 @@ public abstract class PlanVisitor<R, C> {
   public R visitActivateTemplate(ActivateTemplateNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitInto(IntoNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
index 68cdf909cd..0c3d78ce6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
@@ -23,14 +23,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class SchemaQueryOrderByHeatNode extends MultiChildNode {
+public class SchemaQueryOrderByHeatNode extends MultiChildProcessNode {
 
   public SchemaQueryOrderByHeatNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 837be95747..4448ca0c5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
  * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
  * final series aggregated result represented by TsBlock.
  */
-public class AggregationNode extends MultiChildNode {
+public class AggregationNode extends MultiChildProcessNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output as one or two column
   // of
@@ -74,8 +74,10 @@ public class AggregationNode extends MultiChildNode {
       List<AggregationDescriptor> aggregationDescriptorList,
       @Nullable GroupByTimeParameter groupByTimeParameter,
       Ordering scanOrder) {
-    this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
-    this.children = children;
+    super(id, children);
+    this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
   }
 
   public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -91,21 +93,6 @@ public class AggregationNode extends MultiChildNode {
     return scanOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new AggregationNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index 141530d3f4..d3ec660e7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -34,7 +34,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class DeviceMergeNode extends MultiChildNode {
+public class DeviceMergeNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by device and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
@@ -43,16 +43,6 @@ public class DeviceMergeNode extends MultiChildNode {
   // the list of selected devices
   private final List<String> devices;
 
-  public DeviceMergeNode(
-      PlanNodeId id,
-      List<PlanNode> children,
-      OrderByParameter mergeOrderParameter,
-      List<String> devices) {
-    super(id, children);
-    this.mergeOrderParameter = mergeOrderParameter;
-    this.devices = devices;
-  }
-
   public DeviceMergeNode(
       PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> devices) {
     super(id);
@@ -68,21 +58,6 @@ public class DeviceMergeNode extends MultiChildNode {
     return devices;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), getDevices());
@@ -147,13 +122,12 @@ public class DeviceMergeNode extends MultiChildNode {
     }
     DeviceMergeNode that = (DeviceMergeNode) o;
     return Objects.equals(mergeOrderParameter, that.mergeOrderParameter)
-        && Objects.equals(devices, that.devices)
-        && Objects.equals(children, that.children);
+        && Objects.equals(devices, that.devices);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), mergeOrderParameter, devices, children);
+    return Objects.hash(super.hashCode(), mergeOrderParameter, devices);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
index 9d3badac0e..987cbc18d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,102 +16,74 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
-/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class DeviceViewIntoNode extends SingleChildProcessNode {
 
-  private final int limit;
+  private final DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor;
 
-  private PlanNode child;
-
-  public LimitNode(PlanNodeId id, int limit) {
+  public DeviceViewIntoNode(
+      PlanNodeId id, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
     super(id);
-    this.limit = limit;
-  }
-
-  public LimitNode(PlanNodeId id, PlanNode child, int limit) {
-    this(id, limit);
-    this.child = child;
-  }
-
-  public int getLimit() {
-    return limit;
-  }
-
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public void setChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
+    this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
   }
 
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
+  public DeviceViewIntoNode(
+      PlanNodeId id, PlanNode child, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
+    super(id, child);
+    this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
   }
 
   @Override
   public PlanNode clone() {
-    return new LimitNode(getPlanNodeId(), this.limit);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+    return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitLimit(this, context);
+    return ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.LIMIT.serialize(byteBuffer);
-    ReadWriteIOUtils.write(limit, byteBuffer);
+    PlanNodeType.INTO.serialize(byteBuffer);
+    this.deviceViewIntoPathDescriptor.serialize(byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
-    PlanNodeType.LIMIT.serialize(stream);
-    ReadWriteIOUtils.write(limit, stream);
+    PlanNodeType.INTO.serialize(stream);
+    this.deviceViewIntoPathDescriptor.serialize(stream);
   }
 
-  public static LimitNode deserialize(ByteBuffer byteBuffer) {
-    int limit = ReadWriteIOUtils.readInt(byteBuffer);
+  public static DeviceViewIntoNode deserialize(ByteBuffer byteBuffer) {
+    DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor =
+        DeviceViewIntoPathDescriptor.deserialize(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LimitNode(planNodeId, limit);
+    return new DeviceViewIntoNode(planNodeId, deviceViewIntoPathDescriptor);
   }
 
   @Override
-  public String toString() {
-    return "LimitNode-" + this.getPlanNodeId();
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeviceViewInto(this, context);
   }
 
   @Override
@@ -125,12 +97,17 @@ public class LimitNode extends ProcessNode {
     if (!super.equals(o)) {
       return false;
     }
-    LimitNode that = (LimitNode) o;
-    return limit == that.limit && child.equals(that.child);
+    DeviceViewIntoNode intoNode = (DeviceViewIntoNode) o;
+    return deviceViewIntoPathDescriptor.equals(intoNode.deviceViewIntoPathDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), limit, child);
+    return Objects.hash(super.hashCode(), deviceViewIntoPathDescriptor);
+  }
+
+  @Override
+  public String toString() {
+    return "DeviceViewIntoNode-" + getPlanNodeId();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 034db109de..8517904d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -42,7 +42,7 @@ import java.util.Objects;
  * same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
  * contain n+1 columns where the new column is Device column.
  */
-public class DeviceViewNode extends MultiChildNode {
+public class DeviceViewNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by device and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
@@ -95,21 +95,6 @@ public class DeviceViewNode extends MultiChildNode {
     return deviceToMeasurementIndexesMap;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new DeviceViewNode(
@@ -224,7 +209,6 @@ public class DeviceViewNode extends MultiChildNode {
     DeviceViewNode that = (DeviceViewNode) o;
     return mergeOrderParameter.equals(that.mergeOrderParameter)
         && devices.equals(that.devices)
-        && children.equals(that.children)
         && outputColumnNames.equals(that.outputColumnNames)
         && deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
   }
@@ -235,7 +219,6 @@ public class DeviceViewNode extends MultiChildNode {
         super.hashCode(),
         mergeOrderParameter,
         devices,
-        children,
         outputColumnNames,
         deviceToMeasurementIndexesMap);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 780a0e629f..0a4b64f3de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,8 +35,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class ExchangeNode extends PlanNode {
-  private PlanNode child;
+public class ExchangeNode extends SingleChildProcessNode {
+
   // The remoteSourceNode is used to record the remote source info for current ExchangeNode
   // It is not the child of current ExchangeNode
   private FragmentSinkNode remoteSourceNode;
@@ -56,24 +54,11 @@ public class ExchangeNode extends PlanNode {
     super(id);
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    if (this.child == null) {
-      return ImmutableList.of();
-    }
-    return ImmutableList.of(child);
-  }
-
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitExchange(this, context);
   }
 
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
   @Override
   public PlanNode clone() {
     ExchangeNode node = new ExchangeNode(getPlanNodeId());
@@ -85,11 +70,6 @@ public class ExchangeNode extends PlanNode {
     return node;
   }
 
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public List<String> getOutputColumnNames() {
     return outputColumnNames;
@@ -150,14 +130,6 @@ public class ExchangeNode extends PlanNode {
     }
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public void setChild(PlanNode child) {
-    this.child = child;
-  }
-
   @Override
   public String toString() {
     return String.format(
@@ -182,10 +154,6 @@ public class ExchangeNode extends PlanNode {
     this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames());
   }
 
-  public void cleanChildren() {
-    this.child = null;
-  }
-
   public TEndPoint getUpstreamEndpoint() {
     return upstreamEndpoint;
   }
@@ -210,15 +178,13 @@ public class ExchangeNode extends PlanNode {
       return false;
     }
     ExchangeNode that = (ExchangeNode) o;
-    return Objects.equals(child, that.child)
-        && Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
+    return Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
         && Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
         && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(
-        super.hashCode(), child, upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
+    return Objects.hash(super.hashCode(), upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
index 5acb18a9de..d177d98d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -35,48 +33,24 @@ import java.util.List;
 import java.util.Objects;
 
 /** FillNode is used to fill the empty field in one row. */
-public class FillNode extends ProcessNode {
+public class FillNode extends SingleChildProcessNode {
 
   // descriptions of how null values are filled
-  private FillDescriptor fillDescriptor;
-
-  private Ordering scanOrder;
-
-  private PlanNode child;
+  private final FillDescriptor fillDescriptor;
 
-  public FillNode(PlanNodeId id) {
-    super(id);
-  }
+  private final Ordering scanOrder;
 
   public FillNode(PlanNodeId id, FillDescriptor fillDescriptor, Ordering scanOrder) {
-    this(id);
+    super(id);
     this.fillDescriptor = fillDescriptor;
     this.scanOrder = scanOrder;
   }
 
   public FillNode(
       PlanNodeId id, PlanNode child, FillDescriptor fillDescriptor, Ordering scanOrder) {
-    this(id, fillDescriptor, scanOrder);
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  public PlanNode getChild() {
-    return child;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+    super(id, child);
+    this.fillDescriptor = fillDescriptor;
+    this.scanOrder = scanOrder;
   }
 
   @Override
@@ -127,14 +101,12 @@ public class FillNode extends ProcessNode {
       return false;
     }
     FillNode that = (FillNode) o;
-    return Objects.equals(fillDescriptor, that.fillDescriptor)
-        && Objects.equals(child, that.child)
-        && scanOrder == that.scanOrder;
+    return Objects.equals(fillDescriptor, that.fillDescriptor) && scanOrder == that.scanOrder;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), fillDescriptor, child, scanOrder);
+    return Objects.hash(super.hashCode(), fillDescriptor, scanOrder);
   }
 
   public FillDescriptor getFillDescriptor() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 0fa39ff184..da7b8dc9c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
  * <p>If the group by level parameter is [0, 2], then these two columns will not belong to one
  * bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1`
  */
-public class GroupByLevelNode extends MultiChildNode {
+public class GroupByLevelNode extends MultiChildProcessNode {
 
   // The list of aggregate descriptors
   // each GroupByLevelDescriptor will be output as one or two column of result TsBlock
@@ -87,21 +87,6 @@ public class GroupByLevelNode extends MultiChildNode {
     this.scanOrder = scanOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new GroupByLevelNode(
@@ -191,9 +176,15 @@ public class GroupByLevelNode extends MultiChildNode {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     GroupByLevelNode that = (GroupByLevelNode) o;
     return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors)
         && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index e0ef548508..c2e558bdc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -43,7 +43,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class GroupByTagNode extends MultiChildNode {
+public class GroupByTagNode extends MultiChildProcessNode {
 
   private final List<String> tagKeys;
   private final Map<List<String>, List<CrossSeriesAggregationDescriptor>>
@@ -87,16 +87,6 @@ public class GroupByTagNode extends MultiChildNode {
     this.outputColumnNames = Validate.notNull(outputColumnNames);
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
   @Override
   public PlanNode clone() {
     // TODO: better do deep copy
@@ -109,11 +99,6 @@ public class GroupByTagNode extends MultiChildNode {
         this.outputColumnNames);
   }
 
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public List<String> getOutputColumnNames() {
     List<String> ret = new ArrayList<>(tagKeys);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
index 0407f6d946..b68e862a7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
@@ -16,96 +16,71 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
-/**
- * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
- * upstream nodes
- */
-public class OffsetNode extends ProcessNode {
-
-  private final int offset;
+public class IntoNode extends SingleChildProcessNode {
 
-  private PlanNode child;
+  private final IntoPathDescriptor intoPathDescriptor;
 
-  public OffsetNode(PlanNodeId id, int offset) {
+  public IntoNode(PlanNodeId id, IntoPathDescriptor intoPathDescriptor) {
     super(id);
-    this.offset = offset;
+    this.intoPathDescriptor = intoPathDescriptor;
   }
 
-  public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
-    this(id, offset);
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+  public IntoNode(PlanNodeId id, PlanNode child, IntoPathDescriptor intoPathDescriptor) {
+    super(id, child);
+    this.intoPathDescriptor = intoPathDescriptor;
   }
 
   @Override
   public PlanNode clone() {
-    return new OffsetNode(getPlanNodeId(), offset);
+    return new IntoNode(getPlanNodeId(), this.intoPathDescriptor);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitOffset(this, context);
+    return ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.OFFSET.serialize(byteBuffer);
-    ReadWriteIOUtils.write(offset, byteBuffer);
+    PlanNodeType.INTO.serialize(byteBuffer);
+    this.intoPathDescriptor.serialize(byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
-    PlanNodeType.OFFSET.serialize(stream);
-    ReadWriteIOUtils.write(offset, stream);
+    PlanNodeType.INTO.serialize(stream);
+    this.intoPathDescriptor.serialize(stream);
   }
 
-  public static OffsetNode deserialize(ByteBuffer byteBuffer) {
-    int offset = ReadWriteIOUtils.readInt(byteBuffer);
+  public static IntoNode deserialize(ByteBuffer byteBuffer) {
+    IntoPathDescriptor intoPathDescriptor = IntoPathDescriptor.deserialize(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new OffsetNode(planNodeId, offset);
+    return new IntoNode(planNodeId, intoPathDescriptor);
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public int getOffset() {
-    return offset;
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitInto(this, context);
   }
 
   @Override
@@ -116,12 +91,20 @@ public class OffsetNode extends ProcessNode {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    OffsetNode that = (OffsetNode) o;
-    return offset == that.offset && child.equals(that.child);
+    if (!super.equals(o)) {
+      return false;
+    }
+    IntoNode intoNode = (IntoNode) o;
+    return intoPathDescriptor.equals(intoNode.intoPathDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(child, offset);
+    return Objects.hash(super.hashCode(), intoPathDescriptor);
+  }
+
+  @Override
+  public String toString() {
+    return "IntoNode-" + getPlanNodeId();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
index 9d3badac0e..ae03e13f70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -33,54 +31,29 @@ import java.util.List;
 import java.util.Objects;
 
 /** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class LimitNode extends SingleChildProcessNode {
 
   private final int limit;
 
-  private PlanNode child;
-
   public LimitNode(PlanNodeId id, int limit) {
     super(id);
     this.limit = limit;
   }
 
   public LimitNode(PlanNodeId id, PlanNode child, int limit) {
-    this(id, limit);
-    this.child = child;
+    super(id, child);
+    this.limit = limit;
   }
 
   public int getLimit() {
     return limit;
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public void setChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
   @Override
   public PlanNode clone() {
     return new LimitNode(getPlanNodeId(), this.limit);
   }
 
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public List<String> getOutputColumnNames() {
     return child.getOutputColumnNames();
@@ -126,11 +99,11 @@ public class LimitNode extends ProcessNode {
       return false;
     }
     LimitNode that = (LimitNode) o;
-    return limit == that.limit && child.equals(that.child);
+    return limit == that.limit;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), limit, child);
+    return Objects.hash(super.hashCode(), limit);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
similarity index 77%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
index 9e699f370c..e23a86d0c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
@@ -26,16 +26,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public abstract class MultiChildNode extends ProcessNode {
+public abstract class MultiChildProcessNode extends ProcessNode {
 
   protected List<PlanNode> children;
 
-  public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+  public MultiChildProcessNode(PlanNodeId id, List<PlanNode> children) {
     super(id);
     this.children = children;
   }
 
-  public MultiChildNode(PlanNodeId id) {
+  public MultiChildProcessNode(PlanNodeId id) {
     super(id);
     this.children = new ArrayList<>();
   }
@@ -44,6 +44,21 @@ public abstract class MultiChildNode extends ProcessNode {
     this.children = children;
   }
 
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    this.children.add(child);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -55,7 +70,7 @@ public abstract class MultiChildNode extends ProcessNode {
     if (!super.equals(o)) {
       return false;
     }
-    MultiChildNode that = (MultiChildNode) o;
+    MultiChildProcessNode that = (MultiChildProcessNode) o;
     return children.equals(that.children);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
index 0407f6d946..64b912303b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -36,35 +34,18 @@ import java.util.Objects;
  * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
  * upstream nodes
  */
-public class OffsetNode extends ProcessNode {
+public class OffsetNode extends SingleChildProcessNode {
 
   private final int offset;
 
-  private PlanNode child;
-
   public OffsetNode(PlanNodeId id, int offset) {
     super(id);
     this.offset = offset;
   }
 
   public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
-    this(id, offset);
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+    super(id, child);
+    this.offset = offset;
   }
 
   @Override
@@ -100,10 +81,6 @@ public class OffsetNode extends ProcessNode {
     return new OffsetNode(planNodeId, offset);
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
   public int getOffset() {
     return offset;
   }
@@ -116,12 +93,15 @@ public class OffsetNode extends ProcessNode {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
+    if (!super.equals(o)) {
+      return false;
+    }
     OffsetNode that = (OffsetNode) o;
-    return offset == that.offset && child.equals(that.child);
+    return offset == that.offset;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(child, offset);
+    return Objects.hash(super.hashCode(), offset);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
index 3a9b5a7268..d5e71b6257 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -34,38 +32,20 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class ProjectNode extends ProcessNode {
+public class ProjectNode extends SingleChildProcessNode {
 
   private final List<String> outputColumnNames;
 
-  private PlanNode child;
-
   public ProjectNode(PlanNodeId id, List<String> outputColumnNames) {
     super(id);
     this.outputColumnNames = outputColumnNames;
   }
 
   public ProjectNode(PlanNodeId id, PlanNode child, List<String> outputColumnNames) {
-    super(id);
-    this.child = child;
+    super(id, child);
     this.outputColumnNames = outputColumnNames;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public PlanNode clone() {
     return new ProjectNode(getPlanNodeId(), getOutputColumnNames());
@@ -112,15 +92,21 @@ public class ProjectNode extends ProcessNode {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     ProjectNode that = (ProjectNode) o;
-    return outputColumnNames.equals(that.outputColumnNames) && child.equals(that.child);
+    return outputColumnNames.equals(that.outputColumnNames);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), outputColumnNames, child);
+    return Objects.hash(super.hashCode(), outputColumnNames);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
index 9e699f370c..bd183cf3ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
@@ -22,26 +22,52 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableList;
+
 import java.util.List;
 import java.util.Objects;
 
-public abstract class MultiChildNode extends ProcessNode {
+public abstract class SingleChildProcessNode extends ProcessNode {
 
-  protected List<PlanNode> children;
+  protected PlanNode child;
 
-  public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+  public SingleChildProcessNode(PlanNodeId id) {
     super(id);
-    this.children = children;
   }
 
-  public MultiChildNode(PlanNodeId id) {
+  public SingleChildProcessNode(PlanNodeId id, PlanNode child) {
     super(id);
-    this.children = new ArrayList<>();
+    this.child = child;
+  }
+
+  public PlanNode getChild() {
+    return child;
+  }
+
+  public void setChild(PlanNode child) {
+    this.child = child;
+  }
+
+  public void cleanChildren() {
+    this.child = null;
   }
 
-  public void setChildren(List<PlanNode> children) {
-    this.children = children;
+  @Override
+  public List<PlanNode> getChildren() {
+    if (this.child == null) {
+      return ImmutableList.of();
+    }
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
@@ -55,12 +81,12 @@ public abstract class MultiChildNode extends ProcessNode {
     if (!super.equals(o)) {
       return false;
     }
-    MultiChildNode that = (MultiChildNode) o;
-    return children.equals(that.children);
+    SingleChildProcessNode that = (SingleChildProcessNode) o;
+    return Objects.equals(child, that.child);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), children);
+    return Objects.hash(super.hashCode(), child);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
index a2f4935791..49d470a780 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
@@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -38,7 +36,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class SlidingWindowAggregationNode extends ProcessNode {
+public class SlidingWindowAggregationNode extends SingleChildProcessNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output as one column of
   // result TsBlock
@@ -49,8 +47,6 @@ public class SlidingWindowAggregationNode extends ProcessNode {
 
   protected Ordering scanOrder;
 
-  private PlanNode child;
-
   public SlidingWindowAggregationNode(
       PlanNodeId id,
       List<AggregationDescriptor> aggregationDescriptorList,
@@ -68,8 +64,10 @@ public class SlidingWindowAggregationNode extends ProcessNode {
       List<AggregationDescriptor> aggregationDescriptorList,
       GroupByTimeParameter groupByTimeParameter,
       Ordering scanOrder) {
-    this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
-    this.child = child;
+    super(id, child);
+    this.aggregationDescriptorList = aggregationDescriptorList;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
   }
 
   public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -88,25 +86,6 @@ public class SlidingWindowAggregationNode extends ProcessNode {
     return scanOrder;
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public PlanNode clone() {
     return new SlidingWindowAggregationNode(
@@ -189,13 +168,12 @@ public class SlidingWindowAggregationNode extends ProcessNode {
     }
     SlidingWindowAggregationNode that = (SlidingWindowAggregationNode) o;
     return Objects.equals(aggregationDescriptorList, that.aggregationDescriptorList)
-        && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
-        && Objects.equals(child, that.child);
+        && Objects.equals(groupByTimeParameter, that.groupByTimeParameter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter, child);
+    return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter);
   }
 
   public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
index 283987eb97..e033fd163d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,9 +35,7 @@ import java.util.Objects;
  * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
  * optimized logical query plan, the sortNode should not appear.
  */
-public class SortNode extends ProcessNode {
-
-  private PlanNode child;
+public class SortNode extends SingleChildProcessNode {
 
   private final Ordering sortOrder;
 
@@ -49,29 +45,14 @@ public class SortNode extends ProcessNode {
   }
 
   public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) {
-    this(id, sortOrder);
-    this.child = child;
+    super(id, child);
+    this.sortOrder = sortOrder;
   }
 
   public Ordering getSortOrder() {
     return sortOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public PlanNode clone() {
     return new SortNode(getPlanNodeId(), sortOrder);
@@ -117,11 +98,11 @@ public class SortNode extends ProcessNode {
       return false;
     }
     SortNode sortNode = (SortNode) o;
-    return child.equals(sortNode.child) && sortOrder == sortNode.sortOrder;
+    return sortOrder == sortNode.sortOrder;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), child, sortOrder);
+    return Objects.hash(super.hashCode(), sortOrder);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 5685e6d95b..2598cd4e28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
  * timestamp column. It will join two or more TsBlock by Timestamp column. The output result of
  * TimeJoinOperator is sorted by timestamp
  */
-public class TimeJoinNode extends MultiChildNode {
+public class TimeJoinNode extends MultiChildProcessNode {
 
   // This parameter indicates the order when executing multiway merge sort.
   private final Ordering mergeOrder;
@@ -57,21 +57,6 @@ public class TimeJoinNode extends MultiChildNode {
     return mergeOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new TimeJoinNode(getPlanNodeId(), getMergeOrder());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
index f6233e2e9d..112e53a01b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -38,9 +36,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
-public class TransformNode extends ProcessNode {
-
-  protected PlanNode childPlanNode;
+public class TransformNode extends SingleChildProcessNode {
 
   protected final Expression[] outputExpressions;
   protected final boolean keepNull;
@@ -52,13 +48,12 @@ public class TransformNode extends ProcessNode {
 
   public TransformNode(
       PlanNodeId id,
-      PlanNode childPlanNode,
+      PlanNode child,
       Expression[] outputExpressions,
       boolean keepNull,
       ZoneId zoneId,
       Ordering scanOrder) {
-    super(id);
-    this.childPlanNode = childPlanNode;
+    super(id, child);
     this.outputExpressions = outputExpressions;
     this.keepNull = keepNull;
     this.zoneId = zoneId;
@@ -78,21 +73,6 @@ public class TransformNode extends ProcessNode {
     this.scanOrder = scanOrder;
   }
 
-  @Override
-  public final List<PlanNode> getChildren() {
-    return ImmutableList.of(childPlanNode);
-  }
-
-  @Override
-  public final void addChild(PlanNode childPlanNode) {
-    this.childPlanNode = childPlanNode;
-  }
-
-  @Override
-  public final int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public final List<String> getOutputColumnNames() {
     if (outputColumnNames == null) {
@@ -185,7 +165,6 @@ public class TransformNode extends ProcessNode {
     }
     TransformNode that = (TransformNode) o;
     return keepNull == that.keepNull
-        && childPlanNode.equals(that.childPlanNode)
         && Arrays.equals(outputExpressions, that.outputExpressions)
         && zoneId.equals(that.zoneId)
         && scanOrder == that.scanOrder;
@@ -193,7 +172,7 @@ public class TransformNode extends ProcessNode {
 
   @Override
   public int hashCode() {
-    int result = Objects.hash(super.hashCode(), childPlanNode, keepNull, zoneId, scanOrder);
+    int result = Objects.hash(super.hashCode(), keepNull, zoneId, scanOrder);
     result = 31 * result + Arrays.hashCode(outputExpressions);
     return result;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
index a81a5fb455..5f38f64a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -32,7 +32,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryCollectNode extends MultiChildNode {
+public class LastQueryCollectNode extends MultiChildProcessNode {
 
   public LastQueryCollectNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
index 482fc0110d..accb4dcbd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 
 import java.io.DataOutputStream;
@@ -33,7 +33,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryMergeNode extends MultiChildNode {
+public class LastQueryMergeNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by sensor and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
index 23b40ca829..b8e1cf422f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
@@ -38,7 +38,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryNode extends MultiChildNode {
+public class LastQueryNode extends MultiChildProcessNode {
 
   private final Filter timeFilter;
 


[iotdb] 01/12: finish IntoOperator v1

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2293ea4ca46e4ec95ff65f7950f69b1b5ca0429c
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Sun Oct 16 19:01:35 2022 +0800

    finish IntoOperator v1
---
 .../execution/operator/process/IntoOperator.java   | 65 +++++++++++++++++++---
 1 file changed, 58 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index c5ca4cc091..1251bb3e50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -31,15 +33,21 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class IntoOperator implements ProcessOperator {
 
@@ -47,18 +55,24 @@ public class IntoOperator implements ProcessOperator {
   private final Operator child;
 
   private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
+  private final List<Pair<String, String>> sourceTargetPathPairList;
+  private final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   public IntoOperator(
       OperatorContext operatorContext,
       Operator child,
       Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+      Map<PartialPath, Boolean> targetDeviceToAlignedMap,
+      List<Pair<String, String>> sourceTargetPathPairList,
+      Map<String, InputLocation> sourceColumnToInputLocationMap) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators =
         constructInsertTabletStatementGenerators(
             targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+    this.sourceTargetPathPairList = sourceTargetPathPairList;
+    this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
   }
 
   private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -129,11 +143,34 @@ public class IntoOperator implements ProcessOperator {
   }
 
   private TsBlock constructResultTsBlock() {
-    List<TSDataType> dataTypes = new ArrayList<>();
-    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) {
+      timeColumnBuilder.writeLong(0);
+      columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
+      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right));
+      columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
+      resultTsBlockBuilder.declarePosition();
+    }
     return resultTsBlockBuilder.build();
   }
 
+  private int findWritten(String sourceColumn) {
+    InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
+    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+      int written = generator.getWrittenCount(inputLocation);
+      if (written != -1) {
+        return written;
+      }
+    }
+    return 0;
+  }
+
   @Override
   public boolean hasNext() {
     return child.hasNext();
@@ -151,20 +188,20 @@ public class IntoOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    return 0;
+    return child.calculateMaxPeekMemory();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return 0;
+    return child.calculateMaxReturnSize();
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return 0;
+    return child.calculateRetainedSizeAfterCallingNext();
   }
 
-  private static class InsertTabletStatementGenerator {
+  public static class InsertTabletStatementGenerator {
 
     private final int TABLET_ROW_LIMIT =
         IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
@@ -181,6 +218,8 @@ public class IntoOperator implements ProcessOperator {
     private Object[] columns;
     private BitMap[] bitMaps;
 
+    private final Map<InputLocation, AtomicInteger> writtenCounter;
+
     public InsertTabletStatementGenerator(
         PartialPath devicePath,
         Map<String, InputLocation> measurementToInputLocationMap,
@@ -191,6 +230,10 @@ public class IntoOperator implements ProcessOperator {
       this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
       this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
       this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
+      this.writtenCounter = new HashMap<>();
+      for (InputLocation inputLocation : inputLocations) {
+        writtenCounter.put(inputLocation, new AtomicInteger(0));
+      }
     }
 
     public void reset() {
@@ -219,6 +262,7 @@ public class IntoOperator implements ProcessOperator {
           }
 
           bitMaps[i].unmark(rowCount);
+          writtenCounter.get(inputLocations[i]).getAndIncrement();
           switch (valueColumn.getDataType()) {
             case INT32:
               ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
@@ -305,5 +349,12 @@ public class IntoOperator implements ProcessOperator {
 
       return insertTabletStatement;
     }
+
+    public int getWrittenCount(InputLocation inputLocation) {
+      if (!writtenCounter.containsKey(inputLocation)) {
+        return -1;
+      }
+      return writtenCounter.get(inputLocation).get();
+    }
   }
 }


[iotdb] 12/12: add session info in FragmentInstance

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94cebb2572a9de5456af7a14e300cdbdd71d9910
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 19:12:23 2022 +0800

    add session info in FragmentInstance
---
 .../execution/fragment/FragmentInstanceContext.java | 12 ++++++++++++
 .../distribution/WriteFragmentParallelPlanner.java  |  4 +++-
 .../plan/scheduler/load/LoadTsFileScheduler.java    |  8 ++++++--
 .../db/mpp/plan/plan/FragmentInstanceSerdeTest.java |  9 +++++++--
 .../mpp/plan/scheduler/StandaloneSchedulerTest.java | 21 ++++++++++++++++-----
 5 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 5976897529..07e0c9bdb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.fragment;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -86,6 +88,16 @@ public class FragmentInstanceContext extends QueryContext {
     return new FragmentInstanceContext(queryId);
   }
 
+  @TestOnly
+  public static FragmentInstanceContext createFragmentInstanceContext(
+      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+    FragmentInstanceContext instanceContext =
+        new FragmentInstanceContext(id, stateMachine, "test", ZoneId.systemDefault().getId());
+    instanceContext.initialize();
+    instanceContext.start();
+    return instanceContext;
+  }
+
   private FragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 0bbf402818..7c526ef488 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -62,7 +62,9 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               fragment.getId().genFragmentInstanceId(),
               timeFilter,
               queryContext.getQueryType(),
-              queryContext.getTimeOut());
+              queryContext.getTimeOut(),
+              queryContext.getSession().getUserName(),
+              queryContext.getSession().getZoneId());
       instance.setDataRegionAndHost(split.getRegionReplicaSet());
       ret.add(instance);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index dd403d46ea..c89902b7a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -143,7 +143,9 @@ public class LoadTsFileScheduler implements IScheduler {
                 fragmentId.genFragmentInstanceId(),
                 null,
                 queryContext.getQueryType(),
-                queryContext.getTimeOut());
+                queryContext.getTimeOut(),
+                queryContext.getSession().getUserName(),
+                queryContext.getSession().getZoneId());
         instance.setDataRegionAndHost(entry.getKey());
         Future<FragInstanceDispatchResult> dispatchResultFuture =
             dispatcher.dispatch(Collections.singletonList(instance));
@@ -231,7 +233,9 @@ public class LoadTsFileScheduler implements IScheduler {
               fragmentId.genFragmentInstanceId(),
               null,
               queryContext.getQueryType(),
-              queryContext.getTimeOut());
+              queryContext.getTimeOut(),
+              queryContext.getSession().getUserName(),
+              queryContext.getSession().getZoneId());
       instance.setDataRegionAndHost(node.getLocalRegionReplicaSet());
       dispatcher.dispatchLocally(instance);
     } catch (FragmentInstanceDispatchException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index a1898ea023..04fa430e5e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -44,6 +44,7 @@ import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.time.ZoneId;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -68,7 +69,9 @@ public class FragmentInstanceSerdeTest {
             planFragmentId.genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.READ,
-            config.getQueryTimeoutThreshold());
+            config.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     TRegionReplicaSet regionReplicaSet =
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
@@ -101,7 +104,9 @@ public class FragmentInstanceSerdeTest {
             planFragmentId.genFragmentInstanceId(),
             null,
             QueryType.READ,
-            config.getQueryTimeoutThreshold());
+            config.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     TRegionReplicaSet regionReplicaSet =
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 94fac23509..b80adbc755 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -65,6 +65,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -137,7 +138,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status"));
@@ -240,7 +243,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS"));
@@ -353,7 +358,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3"));
@@ -404,7 +411,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath(deviceId));
@@ -485,7 +494,9 @@ public class StandaloneSchedulerTest {
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.WRITE,
-            conf.getQueryTimeoutThreshold());
+            conf.getQueryTimeoutThreshold(),
+            "test",
+            ZoneId.systemDefault().getId());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);


[iotdb] 07/12: add DeviceViewIntoOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 96481443c478805f79082a7be8d255dd6d7881de
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 10:53:53 2022 +0800

    add DeviceViewIntoOperator
---
 .../operator/process/DeviceViewIntoOperator.java   | 82 ++++++++++++++++++++++
 1 file changed, 82 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
new file mode 100644
index 0000000000..e1c38e662c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DeviceViewIntoOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  public DeviceViewIntoOperator(OperatorContext operatorContext, Operator child) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return child.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return child.isFinished();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
+}


[iotdb] 08/12: implement DeviceViewIntoOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit de98a1a31da1e5b20a458fa5b6602108be374aed
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 16:57:37 2022 +0800

    implement DeviceViewIntoOperator
---
 ...IntoOperator.java => AbstractIntoOperator.java} | 115 +++-----
 .../operator/process/DeviceViewIntoOperator.java   | 128 ++++++---
 .../execution/operator/process/IntoOperator.java   | 302 +--------------------
 3 files changed, 126 insertions(+), 419 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
similarity index 74%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 1491623e77..d038874d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -31,13 +29,9 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -47,43 +41,37 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
-public class IntoOperator implements ProcessOperator {
+public abstract class AbstractIntoOperator implements ProcessOperator {
 
-  private final OperatorContext operatorContext;
-  private final Operator child;
+  protected final OperatorContext operatorContext;
+  protected final Operator child;
 
-  private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
-  private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
-  private final Map<String, InputLocation> sourceColumnToInputLocationMap;
+  protected List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators;
 
-  public IntoOperator(
+  protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
+
+  public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
-      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<String, Boolean> targetDeviceToAlignedMap,
-      List<Pair<String, PartialPath>> sourceTargetPathPairList,
+      List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators,
       Map<String, InputLocation> sourceColumnToInputLocationMap) {
     this.operatorContext = operatorContext;
     this.child = child;
-    this.insertTabletStatementGenerators =
-        constructInsertTabletStatementGenerators(
-            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
-    this.sourceTargetPathPairList = sourceTargetPathPairList;
+    this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
   }
 
-  private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
-      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<String, Boolean> targetDeviceToAlignedMap) {
-    List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
+  protected static List<IntoOperator.InsertTabletStatementGenerator>
+      constructInsertTabletStatementGenerators(
+          Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
+          Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+          Map<String, Boolean> targetDeviceToAlignedMap) {
+    List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators =
         new ArrayList<>(targetPathToSourceInputLocationMap.size());
     for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
-      InsertTabletStatementGenerator generator =
-          new InsertTabletStatementGenerator(
+      IntoOperator.InsertTabletStatementGenerator generator =
+          new IntoOperator.InsertTabletStatementGenerator(
               targetDevice,
               targetPathToSourceInputLocationMap.get(targetDevice),
               targetPathToDataTypeMap.get(targetDevice),
@@ -93,45 +81,14 @@ public class IntoOperator implements ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      int lastReadIndex = 0;
-      while (lastReadIndex < inputTsBlock.getPositionCount()) {
-        for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-          lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
-        }
-        insertMultiTabletsInternally(true);
-      }
-    }
-
-    if (child.hasNext()) {
-      return null;
-    } else {
-      insertMultiTabletsInternally(false);
-      return constructResultTsBlock();
-    }
-  }
-
-  private void insertMultiTabletsInternally(boolean needCheck) {
+  protected void insertMultiTabletsInternally(boolean needCheck) {
     if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
         || insertTabletStatementGenerators.get(0).isEmpty()) {
       return;
     }
 
     List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+    for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       insertTabletStatementList.add(generator.constructInsertTabletStatement());
     }
 
@@ -139,32 +96,14 @@ public class IntoOperator implements ProcessOperator {
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
     // TODO: execute insertMultiTabletsStatement
 
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+    for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       generator.reset();
     }
   }
 
-  private TsBlock constructResultTsBlock() {
-    List<TSDataType> outputDataTypes =
-        ColumnHeaderConstant.selectIntoColumnHeaders.stream()
-            .map(ColumnHeader::getColumnType)
-            .collect(Collectors.toList());
-    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
-      timeColumnBuilder.writeLong(0);
-      columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
-      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
-      columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
-      resultTsBlockBuilder.declarePosition();
-    }
-    return resultTsBlockBuilder.build();
-  }
-
-  private int findWritten(String sourceColumn) {
+  protected int findWritten(String sourceColumn) {
     InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+    for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       int written = generator.getWrittenCount(inputLocation);
       if (written != -1) {
         return written;
@@ -173,6 +112,16 @@ public class IntoOperator implements ProcessOperator {
     return 0;
   }
 
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
   @Override
   public boolean hasNext() {
     return child.hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index e1c38e662c..bf1a0920fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -19,64 +19,112 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
-public class DeviceViewIntoOperator implements ProcessOperator {
+public class DeviceViewIntoOperator extends AbstractIntoOperator {
 
-  private final OperatorContext operatorContext;
-  private final Operator child;
+  private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
+      deviceToTargetPathSourceInputLocationMap;
+  private final Map<String, Map<PartialPath, Map<String, TSDataType>>>
+      deviceToTargetPathDataTypeMap;
+  private final Map<String, Boolean> targetDeviceToAlignedMap;
+  private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap;
 
-  public DeviceViewIntoOperator(OperatorContext operatorContext, Operator child) {
-    this.operatorContext = operatorContext;
-    this.child = child;
-  }
+  private String currentDevice;
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
+  private final TsBlockBuilder resultTsBlockBuilder;
 
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
-  }
+  public DeviceViewIntoOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      Map<String, Map<PartialPath, Map<String, InputLocation>>>
+          deviceToTargetPathSourceInputLocationMap,
+      Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
+      Map<String, Boolean> targetDeviceToAlignedMap,
+      Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
+      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+    super(operatorContext, child, null, sourceColumnToInputLocationMap);
+    this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
+    this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
+    this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+    this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap;
 
-  @Override
-  public TsBlock next() {
-    return null;
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
   @Override
-  public boolean hasNext() {
-    return child.hasNext();
-  }
-
-  @Override
-  public void close() throws Exception {
-    child.close();
-  }
+  public TsBlock next() {
+    TsBlock inputTsBlock = child.next();
+    if (inputTsBlock != null) {
+      String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+      if (!Objects.equals(device, currentDevice)) {
+        insertMultiTabletsInternally(false);
+        updateResultTsBlock();
 
-  @Override
-  public boolean isFinished() {
-    return child.isFinished();
-  }
+        insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+        currentDevice = device;
+      }
+      int lastReadIndex = 0;
+      while (lastReadIndex < inputTsBlock.getPositionCount()) {
+        for (IntoOperator.InsertTabletStatementGenerator generator :
+            insertTabletStatementGenerators) {
+          lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+        }
+        insertMultiTabletsInternally(true);
+      }
+    }
 
-  @Override
-  public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
+    if (child.hasNext()) {
+      return null;
+    } else {
+      insertMultiTabletsInternally(false);
+      updateResultTsBlock();
+      return resultTsBlockBuilder.build();
+    }
   }
 
-  @Override
-  public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
+  private void updateResultTsBlock() {
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    for (Pair<String, PartialPath> sourceTargetPathPair :
+        deviceToSourceTargetPathPairListMap.get(currentDevice)) {
+      timeColumnBuilder.writeLong(0);
+      columnBuilders[0].writeBinary(new Binary(currentDevice));
+      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.left));
+      columnBuilders[2].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
+      columnBuilders[3].writeInt(findWritten(sourceTargetPathPair.left));
+      resultTsBlockBuilder.declarePosition();
+    }
   }
 
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
+  private List<IntoOperator.InsertTabletStatementGenerator>
+      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
+        deviceToTargetPathDataTypeMap.get(currentDevice);
+    return constructInsertTabletStatementGenerators(
+        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 1491623e77..6a6c2045d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -20,43 +20,26 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public class IntoOperator implements ProcessOperator {
-
-  private final OperatorContext operatorContext;
-  private final Operator child;
+public class IntoOperator extends AbstractIntoOperator {
 
-  private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
   private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
-  private final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   public IntoOperator(
       OperatorContext operatorContext,
@@ -66,41 +49,13 @@ public class IntoOperator implements ProcessOperator {
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap) {
-    this.operatorContext = operatorContext;
-    this.child = child;
-    this.insertTabletStatementGenerators =
+    super(
+        operatorContext,
+        child,
         constructInsertTabletStatementGenerators(
-            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
+        sourceColumnToInputLocationMap);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
-    this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
-  }
-
-  private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
-      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<String, Boolean> targetDeviceToAlignedMap) {
-    List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
-        new ArrayList<>(targetPathToSourceInputLocationMap.size());
-    for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
-      InsertTabletStatementGenerator generator =
-          new InsertTabletStatementGenerator(
-              targetDevice,
-              targetPathToSourceInputLocationMap.get(targetDevice),
-              targetPathToDataTypeMap.get(targetDevice),
-              targetDeviceToAlignedMap.get(targetDevice.toString()));
-      insertTabletStatementGenerators.add(generator);
-    }
-    return insertTabletStatementGenerators;
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
   }
 
   @Override
@@ -124,26 +79,6 @@ public class IntoOperator implements ProcessOperator {
     }
   }
 
-  private void insertMultiTabletsInternally(boolean needCheck) {
-    if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
-        || insertTabletStatementGenerators.get(0).isEmpty()) {
-      return;
-    }
-
-    List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      insertTabletStatementList.add(generator.constructInsertTabletStatement());
-    }
-
-    InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
-    insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
-    // TODO: execute insertMultiTabletsStatement
-
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      generator.reset();
-    }
-  }
-
   private TsBlock constructResultTsBlock() {
     List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.selectIntoColumnHeaders.stream()
@@ -161,229 +96,4 @@ public class IntoOperator implements ProcessOperator {
     }
     return resultTsBlockBuilder.build();
   }
-
-  private int findWritten(String sourceColumn) {
-    InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      int written = generator.getWrittenCount(inputLocation);
-      if (written != -1) {
-        return written;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return child.hasNext();
-  }
-
-  @Override
-  public void close() throws Exception {
-    child.close();
-  }
-
-  @Override
-  public boolean isFinished() {
-    return child.isFinished();
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
-  }
-
-  public static class InsertTabletStatementGenerator {
-
-    private final int TABLET_ROW_LIMIT =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
-
-    private final PartialPath devicePath;
-    private final boolean isAligned;
-    private final String[] measurements;
-    private final TSDataType[] dataTypes;
-    private final InputLocation[] inputLocations;
-
-    private int rowCount = 0;
-
-    private long[] times;
-    private Object[] columns;
-    private BitMap[] bitMaps;
-
-    private final Map<InputLocation, AtomicInteger> writtenCounter;
-
-    public InsertTabletStatementGenerator(
-        PartialPath devicePath,
-        Map<String, InputLocation> measurementToInputLocationMap,
-        Map<String, TSDataType> measurementToDataTypeMap,
-        Boolean isAligned) {
-      this.devicePath = devicePath;
-      this.isAligned = isAligned;
-      this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
-      this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
-      this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
-      this.writtenCounter = new HashMap<>();
-      for (InputLocation inputLocation : inputLocations) {
-        writtenCounter.put(inputLocation, new AtomicInteger(0));
-      }
-      this.reset();
-    }
-
-    public void reset() {
-      this.rowCount = 0;
-      this.times = new long[TABLET_ROW_LIMIT];
-      this.columns = new Object[this.measurements.length];
-      for (int i = 0; i < this.measurements.length; i++) {
-        switch (dataTypes[i]) {
-          case BOOLEAN:
-            columns[i] = new boolean[TABLET_ROW_LIMIT];
-            break;
-          case INT32:
-            columns[i] = new int[TABLET_ROW_LIMIT];
-            break;
-          case INT64:
-            columns[i] = new long[TABLET_ROW_LIMIT];
-            break;
-          case FLOAT:
-            columns[i] = new float[TABLET_ROW_LIMIT];
-            break;
-          case DOUBLE:
-            columns[i] = new double[TABLET_ROW_LIMIT];
-            break;
-          case TEXT:
-            columns[i] = new Binary[TABLET_ROW_LIMIT];
-            Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
-            break;
-          default:
-            throw new UnSupportedDataTypeException(
-                String.format("Data type %s is not supported.", dataTypes[i]));
-        }
-      }
-      this.bitMaps = new BitMap[this.measurements.length];
-      for (int i = 0; i < this.bitMaps.length; ++i) {
-        this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
-        this.bitMaps[i].markAll();
-      }
-    }
-
-    public int processTsBlock(TsBlock tsBlock, int lastReadIndex) {
-      for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) {
-
-        times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex);
-
-        for (int i = 0; i < measurements.length; ++i) {
-          Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()];
-
-          // if the value is NULL
-          if (valueColumn.isNull(lastReadIndex)) {
-            // bit in bitMaps are marked as 1 (NULL) by default
-            continue;
-          }
-
-          bitMaps[i].unmark(rowCount);
-          writtenCounter.get(inputLocations[i]).getAndIncrement();
-          switch (valueColumn.getDataType()) {
-            case INT32:
-              ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
-              break;
-            case INT64:
-              ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex);
-              break;
-            case FLOAT:
-              ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex);
-              break;
-            case DOUBLE:
-              ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex);
-              break;
-            case BOOLEAN:
-              ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex);
-              break;
-            case TEXT:
-              ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex);
-              break;
-            default:
-              throw new UnSupportedDataTypeException(
-                  String.format(
-                      "data type %s is not supported when convert data at client",
-                      valueColumn.getDataType()));
-          }
-        }
-
-        ++rowCount;
-        if (rowCount == TABLET_ROW_LIMIT) {
-          break;
-        }
-      }
-      return lastReadIndex;
-    }
-
-    public boolean isFull() {
-      return rowCount == TABLET_ROW_LIMIT;
-    }
-
-    public boolean isEmpty() {
-      return rowCount == 0;
-    }
-
-    public InsertTabletStatement constructInsertTabletStatement() {
-      InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
-      insertTabletStatement.setDevicePath(devicePath);
-      insertTabletStatement.setAligned(isAligned);
-      insertTabletStatement.setMeasurements(measurements);
-      insertTabletStatement.setDataTypes(dataTypes);
-      insertTabletStatement.setRowCount(rowCount);
-
-      if (rowCount != TABLET_ROW_LIMIT) {
-        times = Arrays.copyOf(times, rowCount);
-        for (int i = 0; i < columns.length; i++) {
-          switch (dataTypes[i]) {
-            case BOOLEAN:
-              columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount);
-              break;
-            case INT32:
-              columns[i] = Arrays.copyOf((int[]) columns[i], rowCount);
-              break;
-            case INT64:
-              columns[i] = Arrays.copyOf((long[]) columns[i], rowCount);
-              break;
-            case FLOAT:
-              columns[i] = Arrays.copyOf((float[]) columns[i], rowCount);
-              break;
-            case DOUBLE:
-              columns[i] = Arrays.copyOf((double[]) columns[i], rowCount);
-              break;
-            case TEXT:
-              columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount);
-              break;
-            default:
-              throw new UnSupportedDataTypeException(
-                  String.format("Data type %s is not supported.", dataTypes[i]));
-          }
-        }
-      }
-
-      insertTabletStatement.setTimes(times);
-      insertTabletStatement.setColumns(columns);
-      insertTabletStatement.setBitMaps(bitMaps);
-
-      return insertTabletStatement;
-    }
-
-    public int getWrittenCount(InputLocation inputLocation) {
-      if (!writtenCounter.containsKey(inputLocation)) {
-        return -1;
-      }
-      return writtenCounter.get(inputLocation).get();
-    }
-  }
 }


[iotdb] 05/12: add OperatorTreeGenerator for IntoOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aeff0ab7a9c32d2469a1f8362543b358dd025003
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 10:00:12 2022 +0800

    add OperatorTreeGenerator for IntoOperator
---
 .../execution/operator/process/IntoOperator.java   | 26 +++++------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 52 ++++++++++++++++++++++
 .../plan/planner/plan/node/process/IntoNode.java   |  4 ++
 3 files changed, 69 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 1251bb3e50..7dc26ddc28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -55,39 +55,39 @@ public class IntoOperator implements ProcessOperator {
   private final Operator child;
 
   private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
-  private final List<Pair<String, String>> sourceTargetPathPairList;
+  private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
   private final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   public IntoOperator(
       OperatorContext operatorContext,
       Operator child,
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<PartialPath, Boolean> targetDeviceToAlignedMap,
-      List<Pair<String, String>> sourceTargetPathPairList,
+      Map<String, Boolean> targetDeviceToAlignedMap,
+      List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators =
         constructInsertTabletStatementGenerators(
-            targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
   }
 
   private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+      Map<String, Boolean> targetDeviceToAlignedMap) {
     List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
-        new ArrayList<>(targetPathToSourceMap.size());
-    for (PartialPath targetDevice : targetPathToSourceMap.keySet()) {
+        new ArrayList<>(targetPathToSourceInputLocationMap.size());
+    for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
       InsertTabletStatementGenerator generator =
           new InsertTabletStatementGenerator(
               targetDevice,
-              targetPathToSourceMap.get(targetDevice),
+              targetPathToSourceInputLocationMap.get(targetDevice),
               targetPathToDataTypeMap.get(targetDevice),
-              targetDeviceToAlignedMap.get(targetDevice));
+              targetDeviceToAlignedMap.get(targetDevice.toString()));
       insertTabletStatementGenerators.add(generator);
     }
     return insertTabletStatementGenerators;
@@ -150,10 +150,10 @@ public class IntoOperator implements ProcessOperator {
     TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
     TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) {
+    for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
       timeColumnBuilder.writeLong(0);
       columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
-      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right));
+      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
       columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
       resultTsBlockBuilder.declarePosition();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index f3de5e72d4..1dcf871373 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -137,6 +138,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -158,6 +160,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregatio
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
@@ -1336,6 +1339,55 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return super.visitSort(node, context);
   }
 
+  @Override
+  public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                IntoOperator.class.getSimpleName());
+
+    IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor();
+    Map<String, List<InputLocation>> layout = makeLayout(node);
+    Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>();
+    for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) {
+      sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0));
+    }
+
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        new HashMap<>();
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+    Map<PartialPath, Map<String, String>> targetPathToSourceMap =
+        intoPathDescriptor.getTargetPathToSourceMap();
+    for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) {
+      PartialPath targetDevice = entry.getKey();
+      Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>();
+      Map<String, TSDataType> measurementToDataTypeMap = new HashMap<>();
+      for (Map.Entry<String, String> measurementEntry : entry.getValue().entrySet()) {
+        String targetMeasurement = measurementEntry.getKey();
+        String sourceColumn = measurementEntry.getValue();
+        measurementToInputLocationMap.put(
+            targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn));
+        measurementToDataTypeMap.put(
+            targetMeasurement, context.getTypeProvider().getType(sourceColumn));
+      }
+      targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap);
+      targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
+    }
+
+    return new IntoOperator(
+        operatorContext,
+        child,
+        targetPathToSourceInputLocationMap,
+        targetPathToDataTypeMap,
+        intoPathDescriptor.getTargetDeviceToAlignedMap(),
+        intoPathDescriptor.getSourceTargetPathPairList(),
+        sourceColumnToInputLocationMap);
+  }
+
   @Override
   public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
     List<Operator> children =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
index b68e862a7a..73926bd8cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
@@ -48,6 +48,10 @@ public class IntoNode extends SingleChildProcessNode {
     this.intoPathDescriptor = intoPathDescriptor;
   }
 
+  public IntoPathDescriptor getIntoPathDescriptor() {
+    return intoPathDescriptor;
+  }
+
   @Override
   public PlanNode clone() {
     return new IntoNode(getPlanNodeId(), this.intoPathDescriptor);


[iotdb] 10/12: finish execute insertMultiTabletsStatement

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a8d74daf90583ce962593733c2243d7b4ef10ada
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 16:17:42 2022 +0800

    finish execute insertMultiTabletsStatement
---
 .../iotdb/db/client/DataNodeInternalClient.java    | 96 ++++++++++++++++++++++
 .../iotdb/db/exception/IntoProcessException.java   | 27 ++++++
 .../operator/process/AbstractIntoOperator.java     | 20 ++++-
 .../operator/process/DeviceViewIntoOperator.java   |  3 +-
 .../iotdb/db/query/control/SessionManager.java     |  5 ++
 5 files changed, 149 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
new file mode 100644
index 0000000000..43f7271bd2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final long sessionId;
+
+  public DataNodeInternalClient() {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+    sessionId = SESSION_MANAGER.requestInternalSessionId();
+  }
+
+  public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
+    try {
+      if (statement.isEmpty()) {
+        // return success when this statement is empty because server doesn't need to execute it
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      }
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java
new file mode 100644
index 0000000000..a19b049152
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+public class IntoProcessException extends RuntimeException {
+
+  public IntoProcessException(String message) {
+    super(message);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 28b90daae7..21cf6951fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -19,13 +19,17 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.client.DataNodeInternalClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -34,6 +38,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
+
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
@@ -51,6 +59,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
+  private final DataNodeInternalClient client = new DataNodeInternalClient();
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
@@ -95,7 +105,15 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
     InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
-    // TODO: execute insertMultiTabletsStatement
+    TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
+    if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      String message =
+          String.format(
+              "Error occurred while inserting tablets in SELECT INTO. %s",
+              executionStatus.getMessage());
+      LOGGER.error(message);
+      throw new IntoProcessException(message);
+    }
 
     for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       generator.reset();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index f0eab0b71a..201aaaef3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -74,7 +75,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws IntoProcessException {
     TsBlock inputTsBlock = child.next();
     if (inputTsBlock != null) {
       String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 53f140d317..1ee80d5df2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -237,6 +237,11 @@ public class SessionManager {
     return sessionId;
   }
 
+  public long requestInternalSessionId() {
+    return requestSessionId(
+        "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13);
+  }
+
   public boolean releaseSessionResource(long sessionId) {
     return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
   }


[iotdb] 04/12: implement LogicalPlan for SELECT INTO statement

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 248dacce5624045b38aa1ab40441976c3d732974
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Oct 17 22:27:01 2022 +0800

    implement LogicalPlan for SELECT INTO statement
---
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 26 ++++++++++++++++++++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  7 ++++++
 2 files changed, 33 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 965c4bd477..90f1219767 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -52,11 +52,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryO
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -73,8 +75,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
@@ -844,6 +848,28 @@ public class LogicalPlanBuilder {
     }
   }
 
+  public LogicalPlanBuilder planDeviceViewInto(
+      DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
+    if (deviceViewIntoPathDescriptor == null) {
+      return this;
+    }
+
+    this.root =
+        new DeviceViewIntoNode(
+            context.getQueryId().genPlanNodeId(), this.getRoot(), deviceViewIntoPathDescriptor);
+    return this;
+  }
+
+  public LogicalPlanBuilder planInto(IntoPathDescriptor intoPathDescriptor) {
+    if (intoPathDescriptor == null) {
+      return this;
+    }
+
+    this.root =
+        new IntoNode(context.getQueryId().genPlanNodeId(), this.getRoot(), intoPathDescriptor);
+    return this;
+  }
+
   /** Meta Query* */
   public LogicalPlanBuilder planTimeSeriesSchemaSource(
       PartialPath pathPattern,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 2f2f954520..ccac3a83e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -158,6 +158,13 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
             .planOffset(queryStatement.getRowOffset())
             .planLimit(queryStatement.getRowLimit());
 
+    // plan select into
+    if (queryStatement.isAlignByDevice()) {
+      planBuilder = planBuilder.planDeviceViewInto(analysis.getDeviceViewIntoPathDescriptor());
+    } else {
+      planBuilder = planBuilder.planInto(analysis.getIntoPathDescriptor());
+    }
+
     return planBuilder.getRoot();
   }
 


[iotdb] 02/12: Merge branch 'lmh/mppSelectInto' into lmh/intoOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fbc3c41ece91f1507956233de2f0ba180c7dee5f
Merge: 2293ea4ca4 3e50f56e3e
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Oct 17 21:48:06 2022 +0800

    Merge branch 'lmh/mppSelectInto' into lmh/intoOperator
    
    # Conflicts:
    #       server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
    #       server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
    #       server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
    #       server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java

 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   7 +-
 .../parameter/DeviceViewIntoPathDescriptor.java    | 177 ++++++++++++++++++---
 .../planner/plan/parameter/IntoPathDescriptor.java | 132 +++++++++++++--
 .../db/mpp/plan/statement/crud/QueryStatement.java |   3 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     | 169 ++++++++++++++++++++
 5 files changed, 449 insertions(+), 39 deletions(-)


[iotdb] 06/12: fix bugs

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4646bf1285d968ae42cae18010ba64e6ef32d48b
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 10:22:21 2022 +0800

    fix bugs
---
 .../execution/operator/process/IntoOperator.java   | 39 +++++++++++++++++++---
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 14 ++++++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  1 +
 .../planner/plan/node/process/ExchangeNode.java    |  5 +++
 4 files changed, 54 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 7dc26ddc28..1491623e77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -106,12 +106,14 @@ public class IntoOperator implements ProcessOperator {
   @Override
   public TsBlock next() {
     TsBlock inputTsBlock = child.next();
-    int lastReadIndex = 0;
-    while (lastReadIndex < inputTsBlock.getPositionCount()) {
-      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-        lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+    if (inputTsBlock != null) {
+      int lastReadIndex = 0;
+      while (lastReadIndex < inputTsBlock.getPositionCount()) {
+        for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+          lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+        }
+        insertMultiTabletsInternally(true);
       }
-      insertMultiTabletsInternally(true);
     }
 
     if (child.hasNext()) {
@@ -234,12 +236,39 @@ public class IntoOperator implements ProcessOperator {
       for (InputLocation inputLocation : inputLocations) {
         writtenCounter.put(inputLocation, new AtomicInteger(0));
       }
+      this.reset();
     }
 
     public void reset() {
       this.rowCount = 0;
       this.times = new long[TABLET_ROW_LIMIT];
       this.columns = new Object[this.measurements.length];
+      for (int i = 0; i < this.measurements.length; i++) {
+        switch (dataTypes[i]) {
+          case BOOLEAN:
+            columns[i] = new boolean[TABLET_ROW_LIMIT];
+            break;
+          case INT32:
+            columns[i] = new int[TABLET_ROW_LIMIT];
+            break;
+          case INT64:
+            columns[i] = new long[TABLET_ROW_LIMIT];
+            break;
+          case FLOAT:
+            columns[i] = new float[TABLET_ROW_LIMIT];
+            break;
+          case DOUBLE:
+            columns[i] = new double[TABLET_ROW_LIMIT];
+            break;
+          case TEXT:
+            columns[i] = new Binary[TABLET_ROW_LIMIT];
+            Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", dataTypes[i]));
+        }
+      }
       this.bitMaps = new BitMap[this.measurements.length];
       for (int i = 0; i < this.bitMaps.length; ++i) {
         this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 90f1219767..ed4806c1b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -151,6 +151,10 @@ public class LogicalPlanBuilder {
     keys.forEach(k -> context.getTypeProvider().setType(k, dataType));
   }
 
+  private void updateTypeProviderWithConstantType(String columnName, TSDataType dataType) {
+    context.getTypeProvider().setType(columnName, dataType);
+  }
+
   public LogicalPlanBuilder planRawDataSource(
       Set<Expression> sourceExpressions, Ordering scanOrder, Filter timeFilter) {
     List<PlanNode> sourceNodeList = new ArrayList<>();
@@ -854,6 +858,11 @@ public class LogicalPlanBuilder {
       return this;
     }
 
+    ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.forEach(
+        columnHeader -> {
+          updateTypeProviderWithConstantType(
+              columnHeader.getColumnName(), columnHeader.getColumnType());
+        });
     this.root =
         new DeviceViewIntoNode(
             context.getQueryId().genPlanNodeId(), this.getRoot(), deviceViewIntoPathDescriptor);
@@ -865,6 +874,11 @@ public class LogicalPlanBuilder {
       return this;
     }
 
+    ColumnHeaderConstant.selectIntoColumnHeaders.forEach(
+        columnHeader -> {
+          updateTypeProviderWithConstantType(
+              columnHeader.getColumnName(), columnHeader.getColumnType());
+        });
     this.root =
         new IntoNode(context.getQueryId().genPlanNodeId(), this.getRoot(), intoPathDescriptor);
     return this;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1dcf871373..e91d67b973 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1378,6 +1378,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
     }
 
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new IntoOperator(
         operatorContext,
         child,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 0a4b64f3de..06310be926 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -54,6 +54,11 @@ public class ExchangeNode extends SingleChildProcessNode {
     super(id);
   }
 
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitExchange(this, context);