You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/11 07:41:41 UTC

[incubator-iotdb] branch master updated: Use a batched style to get series type (#899)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6df3d43  Use a batched style to get series type (#899)
6df3d43 is described below

commit 6df3d4354ac5f07e0a3df627b085a8d3dccf93c5
Author: Jiang Tian <jt...@163.com>
AuthorDate: Wed Mar 11 15:41:34 2020 +0800

    Use a batched style to get series type (#899)
    
    * Use a batched style to get series type in PhysicalGenerator, TSServiceImpl and ConcatPathOptimizer
    Co-authored-by: qiaojialin <64...@qq.com>
---
 .../exception/query/LogicalOperatorException.java  |   5 +
 .../exception/query/LogicalOptimizeException.java  |   5 +
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   4 +
 .../db/qp/logical/crud/BasicFunctionOperator.java  |   8 +-
 .../iotdb/db/qp/logical/crud/FilterOperator.java   |  31 ++++--
 .../iotdb/db/qp/logical/crud/InOperator.java       |   8 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  70 ++++++++----
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |  13 ++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  27 +++--
 .../org/apache/iotdb/db/utils/SchemaUtils.java     | 119 +++++++++++++--------
 10 files changed, 197 insertions(+), 93 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOperatorException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOperatorException.java
index 1bfee85..b140943 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOperatorException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOperatorException.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.exception.query;
 
+import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 /**
@@ -44,4 +45,8 @@ public class LogicalOperatorException extends QueryProcessException {
     super(String.format("Unsupported type: [%s]. %s", type, message),
         TSStatusCode.LOGICAL_OPERATOR_ERROR.getStatusCode());
   }
+
+  public LogicalOperatorException(IoTDBException e) {
+    super(e);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOptimizeException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOptimizeException.java
index d41fa22..1c70f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOptimizeException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/LogicalOptimizeException.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.exception.query;
 
+import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -36,4 +37,8 @@ public class LogicalOptimizeException extends LogicalOperatorException {
     super(String.format("Unknown token in [%s]: [%s], [%s].", filterOperator, tokenInt,
         SQLConstant.tokenNames.get(tokenInt)), TSStatusCode.LOGICAL_OPTIMIZE_ERROR.getStatusCode());
   }
+
+  public LogicalOptimizeException(IoTDBException e) {
+    super(e);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index bd1089f..8379de1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.qp;
 
 import java.time.ZoneId;
+import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
@@ -35,6 +36,7 @@ import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer;
 import org.apache.iotdb.db.qp.strategy.optimizer.MergeSingleFilterOptimizer;
 import org.apache.iotdb.db.qp.strategy.optimizer.RemoveNotOptimizer;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.read.common.Path;
 
 /**
  * provide a integration method for other user.
@@ -117,6 +119,7 @@ public class Planner {
     if (filter == null) {
       return root;
     }
+    Set<Path> pathSet = filter.getPathSet();
     RemoveNotOptimizer removeNot = new RemoveNotOptimizer();
     filter = removeNot.optimize(filter);
     DnfFilterOptimizer dnf = new DnfFilterOptimizer();
@@ -124,6 +127,7 @@ public class Planner {
     MergeSingleFilterOptimizer merge = new MergeSingleFilterOptimizer();
     filter = merge.optimize(filter);
     root.setFilterOperator(filter);
+    filter.setPathSet(pathSet);
     return root;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
index 7fb5863..a92755a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
@@ -18,11 +18,11 @@
  */
 package org.apache.iotdb.db.qp.logical.crud;
 
+import java.util.Map;
 import java.util.Objects;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -74,9 +74,10 @@ public class BasicFunctionOperator extends FunctionOperator {
   }
 
   @Override
-  protected Pair<IUnaryExpression, String> transformToSingleQueryFilter()
+  protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
+      Map<Path, TSDataType> pathTSDataTypeHashMap)
       throws LogicalOperatorException, MetadataException {
-    TSDataType type = MManager.getInstance().getSeriesType(singlePath.toString());
+    TSDataType type = pathTSDataTypeHashMap.get(singlePath);
     if (type == null) {
       throw new MetadataException(
           "given seriesPath:{" + singlePath.getFullPath() + "} don't exist in metadata");
@@ -134,6 +135,7 @@ public class BasicFunctionOperator extends FunctionOperator {
     ret.tokenSymbol = tokenSymbol;
     ret.isLeaf = isLeaf;
     ret.isSingle = isSingle;
+    ret.pathSet = pathSet;
     return ret;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
index bc22acb..eebd40a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
@@ -23,11 +23,14 @@ import static org.apache.iotdb.db.qp.constant.SQLConstant.KW_OR;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
@@ -54,6 +57,8 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
   boolean isSingle = false;
   // if isSingle = false, singlePath must be null
   Path singlePath = null;
+  // all paths involved in this filter
+  Set<Path> pathSet;
 
   public FilterOperator(int tokenType) {
     super(tokenType);
@@ -105,17 +110,27 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
     return true;
   }
 
+  public void setPathSet(Set<Path> pathSet) {
+    this.pathSet = pathSet;
+  }
+
+  public Set<Path> getPathSet() {
+    return pathSet;
+  }
+
   /**
    * For a filter operator, if isSingle, call transformToSingleQueryFilter.<br> FilterOperator
    * cannot be leaf.
    *
    * @return QueryFilter in TsFile
+   * @param pathTSDataTypeHashMap
    */
-  public IExpression transformToExpression() throws QueryProcessException {
+  public IExpression transformToExpression(
+      Map<Path, TSDataType> pathTSDataTypeHashMap) throws QueryProcessException {
     if (isSingle) {
       Pair<IUnaryExpression, String> ret;
       try {
-        ret = transformToSingleQueryFilter();
+        ret = transformToSingleQueryFilter(pathTSDataTypeHashMap);
       } catch (MetadataException e) {
         throw new QueryProcessException(e);
       }
@@ -125,10 +140,10 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
         throw new LogicalOperatorException(String.valueOf(tokenIntType),
             "this filter is not leaf, but it's empty");
       }
-      IExpression retFilter = childOperators.get(0).transformToExpression();
+      IExpression retFilter = childOperators.get(0).transformToExpression(pathTSDataTypeHashMap);
       IExpression currentFilter;
       for (int i = 1; i < childOperators.size(); i++) {
-        currentFilter = childOperators.get(i).transformToExpression();
+        currentFilter = childOperators.get(i).transformToExpression(pathTSDataTypeHashMap);
         switch (tokenIntType) {
           case KW_AND:
             retFilter = BinaryExpression.and(retFilter, currentFilter);
@@ -151,21 +166,23 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
    * @return - pair.left: UnaryQueryFilter constructed by its one child; pair.right: Path
    * represented by this child.
    * @throws MetadataException exception in filter transforming
+   * @param pathTSDataTypeHashMap
    */
-  protected Pair<IUnaryExpression, String> transformToSingleQueryFilter()
+  protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
+      Map<Path, TSDataType> pathTSDataTypeHashMap)
       throws LogicalOperatorException, MetadataException {
     if (childOperators.isEmpty()) {
       throw new LogicalOperatorException(String.valueOf(tokenIntType),
           "TransformToSingleFilter: this filter is not a leaf, but it's empty.");
     }
     Pair<IUnaryExpression, String> currentPair = childOperators.get(0)
-        .transformToSingleQueryFilter();
+        .transformToSingleQueryFilter(pathTSDataTypeHashMap);
 
     IUnaryExpression retFilter = currentPair.left;
     String path = currentPair.right;
 
     for (int i = 1; i < childOperators.size(); i++) {
-      currentPair = childOperators.get(i).transformToSingleQueryFilter();
+      currentPair = childOperators.get(i).transformToSingleQueryFilter(pathTSDataTypeHashMap);
       if (!path.equals(currentPair.right)) {
         throw new LogicalOperatorException(
             "TransformToSingleFilter: paths among children are not inconsistent: one is: "
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/InOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/InOperator.java
index 3d99339..6f9c303 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/InOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/InOperator.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -75,9 +75,10 @@ public class InOperator extends FunctionOperator {
   }
 
   @Override
-  protected Pair<IUnaryExpression, String> transformToSingleQueryFilter()
+  protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
+      Map<Path, TSDataType> pathTSDataTypeHashMap)
       throws LogicalOperatorException, MetadataException {
-    TSDataType type = MManager.getInstance().getSeriesType(singlePath.toString());
+    TSDataType type = pathTSDataTypeHashMap.get(singlePath);
     if (type == null) {
       throw new MetadataException(
           "given seriesPath:{" + singlePath.getFullPath() + "} don't exist in metadata");
@@ -154,6 +155,7 @@ public class InOperator extends FunctionOperator {
     ret.tokenSymbol = tokenSymbol;
     ret.isLeaf = isLeaf;
     ret.isSingle = isSingle;
+    ret.pathSet = pathSet;
     return ret;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 56f9b80..45c53a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -212,8 +212,13 @@ public class PhysicalGenerator {
     }
   }
 
-  protected TSDataType getSeriesType(String path) throws MetadataException {
-    return SchemaUtils.getSeriesType(path);
+  protected List<TSDataType> getSeriesTypes(List<String> paths,
+      String aggregation) throws MetadataException {
+    return SchemaUtils.getSeriesTypesByString(paths, aggregation);
+  }
+
+  protected List<TSDataType> getSeriesTypes(List<Path> paths) throws MetadataException {
+    return SchemaUtils.getSeriesTypesByPath(paths);
   }
 
   private PhysicalPlan transformQuery(QueryOperator queryOperator)
@@ -304,22 +309,22 @@ public class PhysicalGenerator {
               }
             }
 
-            for (String pathStr : actualPaths) {
-              Path path = new Path(pathStr);
+            String aggregation = originAggregations != null && !originAggregations.isEmpty() ?
+                originAggregations.get(i) : null;
+            List<TSDataType> dataTypes = getSeriesTypes(actualPaths, aggregation);
+            for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
+              Path path = new Path(actualPaths.get(pathIdx));
 
               // check datatype consistency
               // a example of inconsistency: select s0 from root.sg1.d1, root.sg2.d3 align by device,
               // while root.sg1.d1.s0 is INT32 and root.sg2.d3.s0 is FLOAT.
-              String pathForDataType;
               String measurementChecked;
               if (originAggregations != null && !originAggregations.isEmpty()) {
-                pathForDataType = originAggregations.get(i) + "(" + path.getFullPath() + ")";
                 measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
               } else {
-                pathForDataType = path.getFullPath();
                 measurementChecked = path.getMeasurement();
               }
-              TSDataType dataType = getSeriesType(pathForDataType);
+              TSDataType dataType = dataTypes.get(pathIdx);
               if (measurementDataTypeMap.containsKey(measurementChecked)) {
                 if (!dataType.equals(measurementDataTypeMap.get(measurementChecked))) {
                   throw new QueryProcessException(
@@ -392,8 +397,18 @@ public class PhysicalGenerator {
       FilterOperator filterOperator = queryOperator.getFilterOperator();
 
       if (filterOperator != null) {
-        IExpression expression = filterOperator.transformToExpression();
-        ((RawDataQueryPlan) queryPlan).setExpression(expression);
+        List<Path> filterPaths = new ArrayList<>(filterOperator.getPathSet());
+        try {
+          List<TSDataType> seriesTypes = getSeriesTypes(filterPaths);
+          HashMap<Path, TSDataType> pathTSDataTypeHashMap = new HashMap<>();
+          for (int i = 0; i < filterPaths.size(); i++) {
+            pathTSDataTypeHashMap.put(filterPaths.get(i), seriesTypes.get(i));
+          }
+          IExpression expression = filterOperator.transformToExpression(pathTSDataTypeHashMap);
+          ((RawDataQueryPlan) queryPlan).setExpression(expression);
+        } catch (MetadataException e) {
+          throw new LogicalOptimizeException(e);
+        }
       }
     }
     try {
@@ -416,11 +431,23 @@ public class PhysicalGenerator {
       FilterOperator operator)
       throws QueryProcessException {
     Map<String, IExpression> deviceToFilterMap = new HashMap<>();
+    Set<Path> filterPaths = new HashSet<>();
     for (String device : devices) {
       FilterOperator newOperator = operator.copy();
-      concatFilterPath(device, newOperator);
-
-      deviceToFilterMap.put(device, newOperator.transformToExpression());
+      concatFilterPath(device, newOperator, filterPaths);
+      // transform to a list so it can be indexed
+      List<Path> filterPathList = new ArrayList<>(filterPaths);
+      try {
+        List<TSDataType> seriesTypes = getSeriesTypes(filterPathList);
+        Map<Path, TSDataType> pathTSDataTypeHashMap = new HashMap<>();
+        for (int i = 0; i < filterPathList.size(); i++) {
+          pathTSDataTypeHashMap.put(filterPathList.get(i), seriesTypes.get(i));
+        }
+        deviceToFilterMap.put(device, newOperator.transformToExpression(pathTSDataTypeHashMap));
+        filterPaths.clear();
+      } catch (MetadataException e) {
+        throw new QueryProcessException(e);
+      }
     }
 
     return deviceToFilterMap;
@@ -442,10 +469,11 @@ public class PhysicalGenerator {
     return retDevices;
   }
 
-  private void concatFilterPath(String prefix, FilterOperator operator) {
+  private void concatFilterPath(String prefix, FilterOperator operator,
+      Set<Path> filterPaths) {
     if (!operator.isLeaf()) {
       for (FilterOperator child : operator.getChildren()) {
-        concatFilterPath(prefix, child);
+        concatFilterPath(prefix, child, filterPaths);
       }
       return;
     }
@@ -454,20 +482,22 @@ public class PhysicalGenerator {
 
     // do nothing in the cases of "where time > 5" or "where root.d1.s1 > 5"
     if (SQLConstant.isReservedPath(filterPath) || filterPath.startWith(SQLConstant.ROOT)) {
+      filterPaths.add(filterPath);
       return;
     }
 
     Path concatPath = Path.addPrefixPath(filterPath, prefix);
+    filterPaths.add(concatPath);
     basicOperator.setSinglePath(concatPath);
   }
 
   private void generateDataTypes(QueryPlan queryPlan) throws MetadataException {
     List<Path> paths = queryPlan.getPaths();
-    List<TSDataType> dataTypes = new ArrayList<>(paths.size());
-    for (Path path : paths) {
-      TSDataType seriesType = getSeriesType(path.toString());
-      dataTypes.add(seriesType);
-      queryPlan.addTypeMapping(path, seriesType);
+    List<TSDataType> dataTypes = getSeriesTypes(paths);
+    for (int i = 0; i < paths.size(); i++) {
+      Path path = paths.get(i);
+      TSDataType dataType = dataTypes.get(i);
+      queryPlan.addTypeMapping(path, dataType);
     }
     queryPlan.setDataTypes(dataTypes);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index f71c60c..32a9a50 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -19,8 +19,10 @@
 package org.apache.iotdb.db.qp.strategy.optimizer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Set;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
@@ -110,12 +112,14 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
 
     // concat filter
     FilterOperator filter = sfwOperator.getFilterOperator();
+    Set<Path> filterPaths = new HashSet<>();
     if (filter == null) {
       return operator;
     }
     if(!isAlignByDevice){
-      sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter));
+      sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter, filterPaths));
     }
+    filter.setPathSet(filterPaths);
     // GROUP_BY_DEVICE leaves the concatFilter to PhysicalGenerator to optimize filter without prefix first
 
     return sfwOperator;
@@ -208,12 +212,13 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     }
   }
 
-  private FilterOperator concatFilter(List<Path> fromPaths, FilterOperator operator)
+  private FilterOperator concatFilter(List<Path> fromPaths, FilterOperator operator,
+      Set<Path> filterPaths)
       throws LogicalOptimizeException {
     if (!operator.isLeaf()) {
       List<FilterOperator> newFilterList = new ArrayList<>();
       for (FilterOperator child : operator.getChildren()) {
-        newFilterList.add(concatFilter(fromPaths, child));
+        newFilterList.add(concatFilter(fromPaths, child, filterPaths));
       }
       operator.setChildren(newFilterList);
       return operator;
@@ -222,11 +227,13 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     Path filterPath = functionOperator.getSinglePath();
     // do nothing in the cases of "where time > 5" or "where root.d1.s1 > 5"
     if (SQLConstant.isReservedPath(filterPath) || filterPath.startWith(SQLConstant.ROOT)) {
+      filterPaths.add(filterPath);
       return operator;
     }
     List<Path> concatPaths = new ArrayList<>();
     fromPaths.forEach(fromPath -> concatPaths.add(Path.addPrefixPath(filterPath, fromPath)));
     List<Path> noStarPaths = removeStarsInPathWithUnique(concatPaths);
+    filterPaths.addAll(noStarPaths);
     if (noStarPaths.size() == 1) {
       // Transform "select s1 from root.car.* where s1 > 10" to
       // "select s1 from root.car.* where root.car.*.s1 > 10"
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6e4b18b..3ad965c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -306,14 +306,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     QueryResourceManager.getInstance().endQuery(queryId);
   }
 
-  private TSDataType getSeriesType(String path) throws QueryProcessException {
-    try {
-      return SchemaUtils.getSeriesType(path);
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
-    }
-  }
-
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
     TSStatus status;
@@ -332,7 +324,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
           break;
         case "COLUMN":
-          resp.setDataType(getSeriesType(req.getColumnPath()).toString());
+          List<TSDataType> dataTypes = SchemaUtils
+              .getSeriesTypesByString(Collections.singletonList(req.getColumnPath()), null);
+          resp.setDataType(dataTypes.get(0).toString());
           status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
           break;
         case "ALL_COLUMNS":
@@ -343,7 +337,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           status = RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, req.getType());
           break;
       }
-    } catch (MetadataException | OutOfMemoryError | QueryProcessException e) {
+    } catch (MetadataException | OutOfMemoryError e) {
       logger.error(
           String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e);
       status = RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
@@ -642,7 +636,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   private TSExecuteStatementResp getQueryResp(PhysicalPlan plan, String username)
-      throws QueryProcessException, AuthException, TException {
+      throws QueryProcessException, AuthException, TException, MetadataException {
     if (plan instanceof AuthorPlan) {
       return getAuthQueryColumnHeaders(plan);
     } else if (plan instanceof ShowPlan) {
@@ -708,7 +702,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    * get ResultSet schema
    */
   private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan physicalPlan, String username)
-      throws AuthException, TException, QueryProcessException {
+      throws AuthException, TException, QueryProcessException, MetadataException {
 
     List<String> respColumns = new ArrayList<>();
     List<String> columnsTypes = new ArrayList<>();
@@ -740,16 +734,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // wide means not align by device
   private void getWideQueryHeaders(
       QueryPlan plan, List<String> respColumns, List<String> columnTypes)
-      throws TException, QueryProcessException {
+      throws TException, QueryProcessException, MetadataException {
     // Restore column header of aggregate to func(column_name), only
     // support single aggregate function for now
     List<Path> paths = plan.getPaths();
+    List<TSDataType> seriesTypes;
     switch (plan.getOperatorType()) {
       case QUERY:
       case FILL:
         for (Path p : paths) {
           respColumns.add(p.getFullPath());
         }
+        seriesTypes = SchemaUtils.getSeriesTypesByString(respColumns, null);
         break;
       case AGGREGATION:
       case GROUPBY:
@@ -762,13 +758,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         for (int i = 0; i < paths.size(); i++) {
           respColumns.add(aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")");
         }
+        seriesTypes = SchemaUtils.getSeriesTypesByPath(paths, aggregations);
         break;
       default:
         throw new TException("unsupported query type: " + plan.getOperatorType());
     }
 
-    for (String column : respColumns) {
-      columnTypes.add(getSeriesType(column).toString());
+    for (TSDataType seriesType : seriesTypes) {
+      columnTypes.add(seriesType.toString());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 8ae97d8..0b1df97 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -18,12 +18,8 @@
  */
 package org.apache.iotdb.db.utils;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
-
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -96,46 +92,85 @@ public class SchemaUtils {
     }
 
   }
+  public static List<TSDataType> getSeriesTypesByPath(Collection<Path> paths) throws MetadataException {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Path path : paths) {
+      dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+    }
+    return dataTypes;
+  }
 
-  public static TSDataType getSeriesType(String path)
-      throws MetadataException {
-    switch (path.toLowerCase()) {
-      // authorization queries
-      case COLUMN_ROLE:
-      case COLUMN_USER:
-      case COLUMN_PRIVILEGE:
-      case COLUMN_STORAGE_GROUP:
-        return TSDataType.TEXT;
-      case SQLConstant.RESERVED_TIME:
-      case COLUMN_TTL:
-        return TSDataType.INT64;
-      default:
-        // do nothing
+  /**
+   *
+   * @param paths time series paths
+   * @param aggregation aggregation function, may be null
+   * @return The data type of aggregation or (data type of paths if aggregation is null)
+   */
+  public static List<TSDataType> getSeriesTypesByString(Collection<String> paths,
+      String aggregation) throws MetadataException {
+    TSDataType dataType = getAggregationType(aggregation);
+    if (dataType != null) {
+      return Collections.nCopies(paths.size(), dataType);
+    }
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (String path : paths) {
+      dataTypes.add(MManager.getInstance().getSeriesType(path));
     }
+    return dataTypes;
+  }
 
-    if (path.contains("(") && !path.startsWith("(") && path.endsWith(")")) {
-      // aggregation
-      int leftBracketIndex = path.indexOf('(');
-      String aggrType = path.substring(0, leftBracketIndex);
-      String innerPath = path.substring(leftBracketIndex + 1, path.length() - 1);
-      switch (aggrType.toLowerCase()) {
-        case SQLConstant.MIN_TIME:
-        case SQLConstant.MAX_TIME:
-        case SQLConstant.COUNT:
-          return TSDataType.INT64;
-        case SQLConstant.LAST_VALUE:
-        case SQLConstant.FIRST_VALUE:
-        case SQLConstant.MIN_VALUE:
-        case SQLConstant.MAX_VALUE:
-          return getSeriesType(innerPath);
-        case SQLConstant.AVG:
-        case SQLConstant.SUM:
-          return TSDataType.DOUBLE;
-        default:
-          throw new MetadataException(
-              "aggregate does not support " + aggrType + " function.");
+  public static List<TSDataType> getSeriesTypesByPath(Collection<Path> paths,
+      String aggregation) throws MetadataException {
+    TSDataType dataType = getAggregationType(aggregation);
+    if (dataType != null) {
+      return Collections.nCopies(paths.size(), dataType);
+    }
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Path path : paths) {
+      dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+    }
+    return dataTypes;
+  }
+
+  public static List<TSDataType> getSeriesTypesByPath(List<Path> paths,
+      List<String> aggregations) throws MetadataException {
+    List<TSDataType> tsDataTypes = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      TSDataType dataType = getAggregationType(aggregations.get(i));
+      if (dataType != null) {
+        tsDataTypes.add(dataType);
+      } else {
+        tsDataTypes.add(MManager.getInstance().getSeriesType(paths.get(i).getFullPath()));
       }
     }
-    return MManager.getInstance().getSeriesType(path);
+    return tsDataTypes;
+  }
+
+  /**
+   *
+   * @param aggregation aggregation function
+   * @return the data type of the aggregation or null if it aggregation is null
+   */
+  public static TSDataType getAggregationType(String aggregation) throws MetadataException {
+    if (aggregation == null) {
+      return null;
+    }
+    switch (aggregation.toLowerCase()) {
+      case SQLConstant.MIN_TIME:
+      case SQLConstant.MAX_TIME:
+      case SQLConstant.COUNT:
+        return TSDataType.INT64;
+      case SQLConstant.LAST_VALUE:
+      case SQLConstant.FIRST_VALUE:
+      case SQLConstant.MIN_VALUE:
+      case SQLConstant.MAX_VALUE:
+        return null;
+      case SQLConstant.AVG:
+      case SQLConstant.SUM:
+        return TSDataType.DOUBLE;
+      default:
+        throw new MetadataException(
+            "aggregate does not support " + aggregation + " function.");
+    }
   }
 }