You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/12/27 07:25:24 UTC

[iotdb] branch master updated: [IOTDB-2208] Reconstruct the process of generating resultset header of query (#4640)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e4f8e63  [IOTDB-2208] Reconstruct the process of generating resultset header of query (#4640)
e4f8e63 is described below

commit e4f8e63d448c565477ae9acfd38db88d8ba882a7
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Mon Dec 27 15:24:52 2021 +0800

    [IOTDB-2208] Reconstruct the process of generating resultset header of query (#4640)
---
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |  53 ++++-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  62 +++++-
 .../iotdb/db/qp/physical/crud/LastQueryPlan.java   |  22 +-
 .../iotdb/db/qp/physical/crud/QueryIndexPlan.java  |  12 ++
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  60 +++++-
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |  20 +-
 .../db/service/thrift/impl/TSServiceImpl.java      | 236 +--------------------
 7 files changed, 214 insertions(+), 251 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index f61e7a6..fbd6605 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -18,15 +18,21 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.utils.GroupByLevelController;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.thrift.TException;
+
+import java.util.*;
 
 public class AggregationPlan extends RawDataQueryPlan {
 
@@ -48,6 +54,45 @@ public class AggregationPlan extends RawDataQueryPlan {
   }
 
   @Override
+  public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
+      throws TException, MetadataException {
+    if (isGroupByLevel()) {
+      List<String> respColumns = new ArrayList<>();
+      List<String> columnsTypes = new ArrayList<>();
+
+      TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+      for (Map.Entry<String, AggregateResult> groupPathResult :
+          getGroupPathsResultMap().entrySet()) {
+        respColumns.add(groupPathResult.getKey());
+        columnsTypes.add(groupPathResult.getValue().getResultDataType().toString());
+      }
+      resp.setColumns(respColumns);
+      resp.setDataTypeList(columnsTypes);
+      return resp;
+    } else {
+      return super.getTSExecuteStatementResp(isJdbcQuery);
+    }
+  }
+
+  @Override
+  public List<TSDataType> getWideQueryHeaders(
+      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      throws MetadataException {
+    List<TSDataType> seriesTypes = new ArrayList<>();
+    List<String> aggregations = getAggregations();
+    if (aggregations.size() != paths.size()) {
+      for (int i = 1; i < paths.size(); i++) {
+        aggregations.add(aggregations.get(0));
+      }
+    }
+    for (ResultColumn resultColumn : resultColumns) {
+      respColumns.add(resultColumn.getResultColumnName());
+    }
+    seriesTypes.addAll(SchemaUtils.getSeriesTypesByPaths(paths, aggregations));
+    return seriesTypes;
+  }
+
+  @Override
   public List<String> getAggregations() {
     return aggregations;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index a4079a1..e296c96 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -19,14 +19,17 @@
 package org.apache.iotdb.db.qp.physical.crud;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class AlignByDevicePlan extends QueryPlan {
 
@@ -57,6 +60,61 @@ public class AlignByDevicePlan extends QueryPlan {
     // do nothing
   }
 
+  @Override
+  public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery) {
+    List<String> respColumns = new ArrayList<>();
+    List<String> columnsTypes = new ArrayList<>();
+
+    TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+    // set columns in TSExecuteStatementResp.
+    respColumns.add(SQLConstant.ALIGNBY_DEVICE_COLUMN_NAME);
+
+    // get column types and do deduplication
+    // the DEVICE column of ALIGN_BY_DEVICE result
+    columnsTypes.add(TSDataType.TEXT.toString());
+    List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
+    // the DEVICE column of ALIGN_BY_DEVICE result
+    deduplicatedColumnsType.add(TSDataType.TEXT);
+
+    Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
+    Map<String, MeasurementInfo> measurementInfoMap = getMeasurementInfoMap();
+
+    // build column header with constant and non exist column and deduplication
+    List<String> measurements = getMeasurements();
+    for (String measurement : measurements) {
+      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
+      TSDataType type = TSDataType.TEXT;
+      switch (measurementInfo.getMeasurementType()) {
+        case Exist:
+          type = measurementInfo.getColumnDataType();
+          break;
+        case NonExist:
+        case Constant:
+          type = TSDataType.TEXT;
+      }
+      String measurementAlias = measurementInfo.getMeasurementAlias();
+      respColumns.add(measurementAlias != null ? measurementAlias : measurement);
+      columnsTypes.add(type.toString());
+
+      if (!deduplicatedMeasurements.contains(measurement)) {
+        deduplicatedMeasurements.add(measurement);
+        deduplicatedColumnsType.add(type);
+      }
+    }
+
+    // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use.
+    // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
+    setMeasurements(new ArrayList<>(deduplicatedMeasurements));
+    setDataTypes(deduplicatedColumnsType);
+
+    // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing.
+    setPaths(null);
+    resp.setColumns(respColumns);
+    resp.setDataTypeList(columnsTypes);
+    return resp;
+  }
+
   public void setMeasurements(List<String> measurements) {
     this.measurements = measurements;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
index 08511f7..b2028bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
@@ -24,16 +24,18 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.service.StaticResps;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import org.apache.thrift.TException;
+
+import java.util.*;
 
 public class LastQueryPlan extends RawDataQueryPlan {
 
@@ -58,6 +60,18 @@ public class LastQueryPlan extends RawDataQueryPlan {
   }
 
   @Override
+  public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery) {
+    return StaticResps.LAST_RESP.deepCopy();
+  }
+
+  @Override
+  public List<TSDataType> getWideQueryHeaders(
+      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      throws TException {
+    throw new TException("unsupported query type: " + getOperatorType());
+  }
+
+  @Override
   public void setExpression(IExpression expression) throws QueryProcessException {
     if (isValidExpression(expression)) {
       super.setExpression(expression);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 27d20f1..8ff816f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -20,7 +20,12 @@ package org.apache.iotdb.db.qp.physical.crud;
 
 import org.apache.iotdb.db.index.common.IndexType;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import org.apache.thrift.TException;
+
+import java.util.BitSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -34,6 +39,13 @@ public class QueryIndexPlan extends RawDataQueryPlan {
     setOperatorType(OperatorType.QUERY_INDEX);
   }
 
+  @Override
+  public List<TSDataType> getWideQueryHeaders(
+      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      throws TException {
+    throw new TException("unsupported query type: " + getOperatorType());
+  }
+
   public IndexType getIndexType() {
     return indexType;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 5163da6..10afc95 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -25,13 +25,17 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.common.primitives.Bytes;
+import org.apache.thrift.TException;
+
+import java.util.*;
 
 public abstract class QueryPlan extends PhysicalPlan {
 
@@ -67,6 +71,54 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   public abstract void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException;
 
+  public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
+      throws TException, MetadataException {
+    List<String> respColumns = new ArrayList<>();
+    List<String> columnsTypes = new ArrayList<>();
+
+    TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
+    List<String> respSgColumns = new ArrayList<>();
+    BitSet aliasMap = new BitSet();
+    List<TSDataType> seriesTypes =
+        getWideQueryHeaders(respColumns, respSgColumns, isJdbcQuery, aliasMap);
+    for (TSDataType seriesType : seriesTypes) {
+      columnsTypes.add(seriesType.toString());
+    }
+    resp.setColumnNameIndexMap(getPathToIndex());
+    resp.setSgColumns(respSgColumns);
+    List<Byte> byteList = new ArrayList<>();
+    byteList.addAll(Bytes.asList(aliasMap.toByteArray()));
+    resp.setAliasColumns(byteList);
+
+    resp.setColumns(respColumns);
+    resp.setDataTypeList(columnsTypes);
+    return resp;
+  }
+
+  public List<TSDataType> getWideQueryHeaders(
+      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      throws TException, MetadataException {
+    List<TSDataType> seriesTypes = new ArrayList<>();
+    for (int i = 0; i < resultColumns.size(); ++i) {
+      if (isJdbcQuery) {
+        String sgName = IoTDB.metaManager.getBelongedStorageGroup(getPaths().get(i)).getFullPath();
+        respSgColumns.add(sgName);
+        if (resultColumns.get(i).getAlias() == null) {
+          respColumns.add(
+              resultColumns.get(i).getResultColumnName().substring(sgName.length() + 1));
+        } else {
+          aliasList.set(i);
+          respColumns.add(resultColumns.get(i).getResultColumnName());
+        }
+      } else {
+        respColumns.add(resultColumns.get(i).getResultColumnName());
+      }
+      seriesTypes.add(paths.get(i).getSeriesType());
+    }
+    return seriesTypes;
+  }
+
   @Override
   public List<MeasurementPath> getPaths() {
     return paths;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index c88fe2e..999465d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -27,16 +27,11 @@ import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
@@ -87,6 +82,17 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
     }
   }
 
+  @Override
+  public List<TSDataType> getWideQueryHeaders(
+      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) {
+    List<TSDataType> seriesTypes = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      respColumns.add(resultColumns.get(i).getResultColumnName());
+      seriesTypes.add(resultColumns.get(i).getDataType());
+    }
+    return seriesTypes;
+  }
+
   protected void setDatasetOutputIndexToResultColumnIndex(
       int datasetOutputIndex, Integer originalIndex) {
     datasetOutputIndexToResultColumnIndex.put(datasetOutputIndex, originalIndex);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 771a25f..87cf042 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -35,44 +35,15 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
-import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
-import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
-import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
-import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
-import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.sys.*;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
-import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.StaticResps;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
@@ -82,51 +53,10 @@ import org.apache.iotdb.db.service.metrics.Operation;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
-import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
-import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -135,7 +65,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-import com.google.common.primitives.Bytes;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,22 +73,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.tryCatchQueryException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.*;
 
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl extends BasicServiceProvider implements TSIService.Iface {
@@ -755,10 +673,6 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
   private TSExecuteStatementResp getQueryColumnHeaders(
       PhysicalPlan physicalPlan, String username, boolean isJdbcQuery)
       throws AuthException, TException, MetadataException {
-
-    List<String> respColumns = new ArrayList<>();
-    List<String> columnsTypes = new ArrayList<>();
-
     // check permissions
     if (!checkAuthorization(physicalPlan.getAuthPaths(), physicalPlan, username)) {
       return RpcUtils.getTSExecuteStatementResp(
@@ -767,145 +681,7 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
               "No permissions for this operation " + physicalPlan.getOperatorType()));
     }
 
-    TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
-
-    // align by device query
-    QueryPlan plan = (QueryPlan) physicalPlan;
-    if (plan instanceof AlignByDevicePlan) {
-      getAlignByDeviceQueryHeaders((AlignByDevicePlan) plan, respColumns, columnsTypes);
-    } else if (plan instanceof LastQueryPlan) {
-      // Last Query should return different respond instead of the static one
-      // because the query dataset and query id is different although the header of last query is
-      // same.
-      return StaticResps.LAST_RESP.deepCopy();
-    } else if (plan.isGroupByLevel()) {
-      for (Map.Entry<String, AggregateResult> groupPathResult :
-          ((AggregationPlan) plan).getGroupPathsResultMap().entrySet()) {
-        respColumns.add(groupPathResult.getKey());
-        columnsTypes.add(groupPathResult.getValue().getResultDataType().toString());
-      }
-    } else {
-      List<String> respSgColumns = new ArrayList<>();
-      BitSet aliasMap = new BitSet();
-      getWideQueryHeaders(plan, respColumns, columnsTypes, respSgColumns, isJdbcQuery, aliasMap);
-      resp.setColumnNameIndexMap(plan.getPathToIndex());
-      resp.setSgColumns(respSgColumns);
-      List<Byte> byteList = new ArrayList<>();
-      byteList.addAll(Bytes.asList(aliasMap.toByteArray()));
-      resp.setAliasColumns(byteList);
-    }
-    resp.setColumns(respColumns);
-    resp.setDataTypeList(columnsTypes);
-    return resp;
-  }
-
-  // wide means not align by device
-  private void getWideQueryHeaders(
-      QueryPlan plan,
-      List<String> respColumns,
-      List<String> columnTypes,
-      List<String> respSgColumns,
-      Boolean isJdbcQuery,
-      BitSet aliasList)
-      throws TException, MetadataException {
-    List<ResultColumn> resultColumns = plan.getResultColumns();
-    List<MeasurementPath> paths = plan.getPaths();
-    List<TSDataType> seriesTypes = new ArrayList<>();
-    switch (plan.getOperatorType()) {
-      case QUERY:
-      case FILL:
-        for (int i = 0; i < resultColumns.size(); ++i) {
-          if (isJdbcQuery) {
-            String sgName =
-                IoTDB.metaManager.getBelongedStorageGroup(plan.getPaths().get(i)).getFullPath();
-            respSgColumns.add(sgName);
-            if (resultColumns.get(i).getAlias() == null) {
-              respColumns.add(
-                  resultColumns.get(i).getResultColumnName().substring(sgName.length() + 1));
-            } else {
-              aliasList.set(i);
-              respColumns.add(resultColumns.get(i).getResultColumnName());
-            }
-          } else {
-            respColumns.add(resultColumns.get(i).getResultColumnName());
-          }
-          seriesTypes.add(paths.get(i).getSeriesType());
-        }
-        break;
-      case AGGREGATION:
-      case GROUP_BY_TIME:
-      case GROUP_BY_FILL:
-        List<String> aggregations = plan.getAggregations();
-        if (aggregations.size() != paths.size()) {
-          for (int i = 1; i < paths.size(); i++) {
-            aggregations.add(aggregations.get(0));
-          }
-        }
-        for (ResultColumn resultColumn : resultColumns) {
-          respColumns.add(resultColumn.getResultColumnName());
-        }
-        seriesTypes = SchemaUtils.getSeriesTypesByPaths(paths, aggregations);
-        break;
-      case UDAF:
-      case UDTF:
-        seriesTypes = new ArrayList<>();
-        for (int i = 0; i < paths.size(); i++) {
-          respColumns.add(resultColumns.get(i).getResultColumnName());
-          seriesTypes.add(resultColumns.get(i).getDataType());
-        }
-        break;
-      default:
-        throw new TException("unsupported query type: " + plan.getOperatorType());
-    }
-
-    for (TSDataType seriesType : seriesTypes) {
-      columnTypes.add(seriesType.toString());
-    }
-  }
-
-  private void getAlignByDeviceQueryHeaders(
-      AlignByDevicePlan plan, List<String> respColumns, List<String> columnTypes) {
-    // set columns in TSExecuteStatementResp.
-    respColumns.add(SQLConstant.ALIGNBY_DEVICE_COLUMN_NAME);
-
-    // get column types and do deduplication
-    columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result
-    List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
-    deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result
-
-    Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
-    Map<String, MeasurementInfo> measurementInfoMap = plan.getMeasurementInfoMap();
-
-    // build column header with constant and non exist column and deduplication
-    List<String> measurements = plan.getMeasurements();
-    for (String measurement : measurements) {
-      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
-      TSDataType type = TSDataType.TEXT;
-      switch (measurementInfo.getMeasurementType()) {
-        case Exist:
-          type = measurementInfo.getColumnDataType();
-          break;
-        case NonExist:
-        case Constant:
-          type = TSDataType.TEXT;
-      }
-      String measurementAlias = measurementInfo.getMeasurementAlias();
-      respColumns.add(measurementAlias != null ? measurementAlias : measurement);
-      columnTypes.add(type.toString());
-
-      if (!deduplicatedMeasurements.contains(measurement)) {
-        deduplicatedMeasurements.add(measurement);
-        deduplicatedColumnsType.add(type);
-      }
-    }
-
-    // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use.
-    // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
-    plan.setMeasurements(new ArrayList<>(deduplicatedMeasurements));
-    plan.setDataTypes(deduplicatedColumnsType);
-
-    // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing.
-    plan.setPaths(null);
+    return ((QueryPlan) physicalPlan).getTSExecuteStatementResp(isJdbcQuery);
   }
 
   private TSExecuteStatementResp executeSelectIntoStatement(