You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/02 03:00:27 UTC

[iotdb] branch master updated: Influxdb service adapts to distributed MPP framework and fixes some bugs in InfluxFunction and TagInfoRecord (#6828)

This is an automated email from the ASF dual-hosted git repository.

qijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1140cf2da6 Influxdb service adapts to distributed MPP framework and fixes some bugs in InfluxFunction and TagInfoRecord (#6828)
1140cf2da6 is described below

commit 1140cf2da6aae64f28f9ac36b97c6048756228e6
Author: Jian Zhang <38...@users.noreply.github.com>
AuthorDate: Tue Aug 2 11:00:21 2022 +0800

    Influxdb service adapts to distributed MPP framework and fixes some bugs in InfluxFunction and TagInfoRecord (#6828)
    
    * Influxdb service adapts to distributed MPP framework
    
    * fix InfluxFirstFunction,InfluxLastFunction,InfluxMeanFunction
    
    * Using the '.*' form of import should be avoided
    
    * spotless apply
    
    * influxdb mpp test
    
    * influxdb mpp test
    
    * Modified to rely on an AbstractInfluxDBMetaManager
    
    * Generate tag timestamps using AtomicLong
---
 docker/src/main/Dockerfile-single-influxdb         |   4 +-
 .../iotdb/db/protocol/influxdb/dto/IoTDBPoint.java |  40 +-
 .../function/aggregator/InfluxMeanFunction.java    |   4 +-
 .../function/selector/InfluxFirstFunction.java     |   5 +-
 .../function/selector/InfluxLastFunction.java      |   5 +-
 .../influxdb/handler/AbstractQueryHandler.java     | 511 +++++++++++++++++++++
 .../protocol/influxdb/handler/NewQueryHandler.java | 200 ++++++++
 .../db/protocol/influxdb/handler/QueryHandler.java | 468 +------------------
 .../influxdb/meta/AbstractInfluxDBMetaManager.java | 114 +++++
 .../influxdb/meta/InfluxDBMetaManager.java         |  96 +---
 .../influxdb/meta/NewInfluxDBMetaManager.java      | 129 ++++++
 .../db/protocol/influxdb/meta/TagInfoRecords.java  |  28 +-
 .../influxdb/util/InfluxReqAndRespUtils.java       |  55 +++
 .../protocol/influxdb/util/QueryResultUtils.java   | 153 ++++++
 .../db/protocol/influxdb/util/StringUtils.java     |  11 +
 .../iotdb/db/service/InfluxDBRPCService.java       |  26 +-
 .../handler/InfluxDBServiceThriftHandler.java      |  10 +-
 .../thrift/impl/IInfluxDBServiceWithHandler.java   |  25 +
 .../service/thrift/impl/InfluxDBServiceImpl.java   |  20 +-
 .../thrift/impl/NewInfluxDBServiceImpl.java        | 128 ++++++
 20 files changed, 1444 insertions(+), 588 deletions(-)

diff --git a/docker/src/main/Dockerfile-single-influxdb b/docker/src/main/Dockerfile-single-influxdb
index 8319075d5b..ff15b26cfb 100644
--- a/docker/src/main/Dockerfile-single-influxdb
+++ b/docker/src/main/Dockerfile-single-influxdb
@@ -33,7 +33,7 @@ RUN apt update \
   && apt autoremove -y \
   && apt purge --auto-remove -y \
   && apt clean -y
-RUN dos2unix /iotdb/sbin/start-server.sh
+RUN dos2unix /iotdb/sbin/start-new-server.sh
 RUN dos2unix /iotdb/sbin/../conf/datanode-env.sh
 EXPOSE 6667
 EXPOSE 31999
@@ -43,4 +43,4 @@ EXPOSE 8181
 VOLUME /iotdb/data
 VOLUME /iotdb/logs
 ENV PATH="/iotdb/sbin/:/iotdb/tools/:${PATH}"
-ENTRYPOINT ["/iotdb/sbin/start-server.sh"]
+ENTRYPOINT ["/iotdb/sbin/start-new-server.sh"]
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java
index 1c7019bb54..ac7c1ce984 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java
@@ -21,11 +21,12 @@ package org.apache.iotdb.db.protocol.influxdb.dto;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.utils.DataTypeUtils;
 import org.apache.iotdb.db.utils.ParameterUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.influxdb.dto.Point;
@@ -57,7 +58,8 @@ public class IoTDBPoint {
     this.values = values;
   }
 
-  public IoTDBPoint(String database, Point point, InfluxDBMetaManager metaManager) {
+  public IoTDBPoint(
+      String database, Point point, AbstractInfluxDBMetaManager metaManager, long sessionID) {
     String measurement = null;
     Map<String, String> tags = new HashMap<>();
     Map<String, Object> fields = new HashMap<>();
@@ -67,8 +69,8 @@ public class IoTDBPoint {
     for (java.lang.reflect.Field reflectField : point.getClass().getDeclaredFields()) {
       reflectField.setAccessible(true);
       try {
-        if (reflectField.getType().getName().equalsIgnoreCase("java.util.concurrent.TimeUnit")
-            && reflectField.getName().equalsIgnoreCase("precision")) {
+        if ("java.util.concurrent.TimeUnit".equalsIgnoreCase(reflectField.getType().getName())
+            && "precision".equalsIgnoreCase(reflectField.getName())) {
           precision = (TimeUnit) reflectField.get(point);
         }
       } catch (IllegalAccessException e) {
@@ -79,17 +81,17 @@ public class IoTDBPoint {
     for (java.lang.reflect.Field reflectField : point.getClass().getDeclaredFields()) {
       reflectField.setAccessible(true);
       try {
-        if (reflectField.getType().getName().equalsIgnoreCase("java.util.Map")
-            && reflectField.getName().equalsIgnoreCase("fields")) {
+        if ("java.util.Map".equalsIgnoreCase(reflectField.getType().getName())
+            && "fields".equalsIgnoreCase(reflectField.getName())) {
           fields = (Map<String, Object>) reflectField.get(point);
-        } else if (reflectField.getType().getName().equalsIgnoreCase("java.util.Map")
-            && reflectField.getName().equalsIgnoreCase("tags")) {
+        } else if ("java.util.Map".equalsIgnoreCase(reflectField.getType().getName())
+            && "tags".equalsIgnoreCase(reflectField.getName())) {
           tags = (Map<String, String>) reflectField.get(point);
-        } else if (reflectField.getType().getName().equalsIgnoreCase("java.lang.String")
-            && reflectField.getName().equalsIgnoreCase("measurement")) {
+        } else if ("java.lang.String".equalsIgnoreCase(reflectField.getType().getName())
+            && "measurement".equalsIgnoreCase(reflectField.getName())) {
           measurement = (String) reflectField.get(point);
-        } else if (reflectField.getType().getName().equalsIgnoreCase("java.lang.Number")
-            && reflectField.getName().equalsIgnoreCase("time")) {
+        } else if ("java.lang.Number".equalsIgnoreCase(reflectField.getType().getName())
+            && "time".equalsIgnoreCase(reflectField.getName())) {
           time = (Long) reflectField.get(point);
           time = TimeUnit.MILLISECONDS.convert(time, precision);
         }
@@ -103,7 +105,7 @@ public class IoTDBPoint {
     }
     ParameterUtils.checkNonEmptyString(database, "database");
     ParameterUtils.checkNonEmptyString(measurement, "measurement name");
-    String path = metaManager.generatePath(database, measurement, tags);
+    String path = metaManager.generatePath(database, measurement, tags, sessionID);
     List<String> measurements = new ArrayList<>();
     List<TSDataType> types = new ArrayList<>();
     List<Object> values = new ArrayList<>();
@@ -149,4 +151,16 @@ public class IoTDBPoint {
         DataTypeUtils.getValueBuffer(getTypes(), getValues()),
         false);
   }
+
+  public TSInsertRecordReq convertToTSInsertRecordReq(long sessionID)
+      throws IoTDBConnectionException {
+    TSInsertRecordReq tsInsertRecordReq = new TSInsertRecordReq();
+    tsInsertRecordReq.setValues(DataTypeUtils.getValueBuffer(getTypes(), getValues()));
+    tsInsertRecordReq.setMeasurements(getMeasurements());
+    tsInsertRecordReq.setPrefixPath(getDeviceId());
+    tsInsertRecordReq.setIsAligned(false);
+    tsInsertRecordReq.setTimestamp(getTime());
+    tsInsertRecordReq.setSessionId(sessionID);
+    return tsInsertRecordReq;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java
index fb1b936e2a..1337cc701b 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java
@@ -29,7 +29,7 @@ import java.util.List;
 
 public class InfluxMeanFunction extends InfluxAggregator {
   private final List<Double> numbers = new ArrayList<>();
-  long sum = 0;
+  double sum = 0;
   long count = 0;
 
   public InfluxMeanFunction(List<Expression> expressionList) {
@@ -66,7 +66,7 @@ public class InfluxMeanFunction extends InfluxAggregator {
     if (functionValues.length == 1) {
       count += (long) functionValues[0].getValue();
     } else if (functionValues.length == 2) {
-      sum += (long) functionValues[1].getValue();
+      sum += (double) functionValues[1].getValue();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java
index 0599b8ddc5..9ec5dbcdbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java
@@ -45,10 +45,7 @@ public class InfluxFirstFunction extends InfluxSelector {
 
   @Override
   public void updateValueIoTDBFunc(InfluxFunctionValue... functionValues) {
-    if (value == null && getTimestamp() == null) {
-      value = functionValues[0].getValue();
-      setTimestamp(functionValues[0].getTimestamp());
-    } else if (getTimestamp() < functionValues[0].getTimestamp()) {
+    if (functionValues[0].getTimestamp() < getTimestamp()) {
       value = functionValues[0].getValue();
       setTimestamp(functionValues[0].getTimestamp());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java
index be9cce9119..062f3f5df6 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java
@@ -45,10 +45,7 @@ public class InfluxLastFunction extends InfluxSelector {
 
   @Override
   public void updateValueIoTDBFunc(InfluxFunctionValue... functionValues) {
-    if (value == null && getTimestamp() == null) {
-      value = functionValues[0].getValue();
-      setTimestamp(functionValues[0].getTimestamp());
-    } else if (getTimestamp() > functionValues[0].getTimestamp()) {
+    if (functionValues[0].getTimestamp() > getTimestamp()) {
       value = functionValues[0].getValue();
       setTimestamp(functionValues[0].getTimestamp());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java
new file mode 100644
index 0000000000..78899f7e34
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.handler;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.ResultColumn;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionFactory;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
+import org.apache.iotdb.db.protocol.influxdb.function.aggregator.InfluxAggregator;
+import org.apache.iotdb.db.protocol.influxdb.function.selector.InfluxSelector;
+import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
+import org.apache.iotdb.db.protocol.influxdb.operator.InfluxSelectComponent;
+import org.apache.iotdb.db.protocol.influxdb.util.FilterUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.JacksonUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.StringUtils;
+import org.apache.iotdb.db.qp.constant.FilterConstant;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.service.basic.ServiceProvider;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryResultRsp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractQueryHandler {
+
+  abstract Map<String, Integer> getFieldOrders(
+      String database, String measurement, ServiceProvider serviceProvider, long sessionId);
+
+  abstract InfluxFunctionValue updateByIoTDBFunc(
+      InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid);
+
+  abstract QueryResult queryByConditions(
+      String querySql,
+      String database,
+      String measurement,
+      ServiceProvider serviceProvider,
+      Map<String, Integer> fieldOrders,
+      long sessionId)
+      throws AuthException;
+
+  public final InfluxQueryResultRsp queryInfluxDB(
+      String database,
+      InfluxQueryOperator queryOperator,
+      long sessionId,
+      ServiceProvider serviceProvider) {
+    String measurement = queryOperator.getFromComponent().getPrefixPaths().get(0).getFullPath();
+    // The list of fields under the current measurement and the order of the specified rules
+    Map<String, Integer> fieldOrders =
+        getFieldOrders(database, measurement, serviceProvider, sessionId);
+    QueryResult queryResult;
+    InfluxQueryResultRsp tsQueryResultRsp = new InfluxQueryResultRsp();
+    try {
+      // contain filter condition or have common query the result of by traversal.
+      if (queryOperator.getWhereComponent() != null
+          || queryOperator.getSelectComponent().isHasCommonQuery()
+          || queryOperator.getSelectComponent().isHasOnlyTraverseFunction()) {
+        // step1 : generate query results
+        queryResult =
+            queryExpr(
+                queryOperator.getWhereComponent() != null
+                    ? queryOperator.getWhereComponent().getFilterOperator()
+                    : null,
+                database,
+                measurement,
+                serviceProvider,
+                fieldOrders,
+                sessionId);
+        // step2 : select filter
+        ProcessSelectComponent(queryResult, queryOperator.getSelectComponent());
+      }
+      // don't contain filter condition and only have function use iotdb function.
+      else {
+        queryResult =
+            queryFuncWithoutFilter(
+                queryOperator.getSelectComponent(),
+                database,
+                measurement,
+                serviceProvider,
+                sessionId);
+      }
+      return tsQueryResultRsp
+          .setResultJsonString(JacksonUtils.bean2Json(queryResult))
+          .setStatus(RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
+    } catch (AuthException e) {
+      return tsQueryResultRsp.setStatus(
+          RpcUtils.getInfluxDBStatus(
+              TSStatusCode.UNINITIALIZED_AUTH_ERROR.getStatusCode(), e.getMessage()));
+    }
+  }
+
+  /**
+   * conditions are generated from subtrees of unique conditions
+   *
+   * @param basicFunctionOperator subtree to generate condition
+   * @return corresponding conditions
+   */
+  public IExpression getIExpressionForBasicFunctionOperator(
+      BasicFunctionOperator basicFunctionOperator) {
+    return new SingleSeriesExpression(
+        basicFunctionOperator.getSinglePath(),
+        FilterUtils.filterTypeToFilter(
+            basicFunctionOperator.getFilterType(), basicFunctionOperator.getValue()));
+  }
+
+  /**
+   * further process the obtained query result through the query criteria of select
+   *
+   * @param queryResult query results to be processed
+   * @param selectComponent select conditions to be filtered
+   */
+  public void ProcessSelectComponent(
+      QueryResult queryResult, InfluxSelectComponent selectComponent) {
+
+    // get the row order map of the current data result first
+    List<String> columns = queryResult.getResults().get(0).getSeries().get(0).getColumns();
+    Map<String, Integer> columnOrders = new HashMap<>();
+    for (int i = 0; i < columns.size(); i++) {
+      columnOrders.put(columns.get(i), i);
+    }
+    // get current values
+    List<List<Object>> values = queryResult.getResults().get(0).getSeries().get(0).getValues();
+    // new columns
+    List<String> newColumns = new ArrayList<>();
+    newColumns.add(InfluxSQLConstant.RESERVED_TIME);
+
+    // when have function
+    if (selectComponent.isHasFunction()) {
+      List<InfluxFunction> functions = new ArrayList<>();
+      for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
+        Expression expression = resultColumn.getExpression();
+        if (expression instanceof FunctionExpression) {
+          String functionName = ((FunctionExpression) expression).getFunctionName();
+          functions.add(
+              InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
+          newColumns.add(functionName);
+        } else if (expression instanceof TimeSeriesOperand) {
+          String columnName = ((TimeSeriesOperand) expression).getPath().getFullPath();
+          if (!columnName.equals(InfluxSQLConstant.STAR)) {
+            newColumns.add(columnName);
+          } else {
+            newColumns.addAll(columns.subList(1, columns.size()));
+          }
+        }
+      }
+      for (List<Object> value : values) {
+        for (InfluxFunction function : functions) {
+          List<Expression> expressions = function.getExpressions();
+          if (expressions == null) {
+            throw new IllegalArgumentException("not support param");
+          }
+          TimeSeriesOperand parmaExpression = (TimeSeriesOperand) expressions.get(0);
+          String parmaName = parmaExpression.getPath().getFullPath();
+          if (columnOrders.containsKey(parmaName)) {
+            Object selectedValue = value.get(columnOrders.get(parmaName));
+            Long selectedTimestamp = (Long) value.get(0);
+            if (selectedValue != null) {
+              // selector function
+              if (function instanceof InfluxSelector) {
+                ((InfluxSelector) function)
+                    .updateValueAndRelateValues(
+                        new InfluxFunctionValue(selectedValue, selectedTimestamp), value);
+              } else {
+                // aggregate function
+                ((InfluxAggregator) function)
+                    .updateValueBruteForce(
+                        new InfluxFunctionValue(selectedValue, selectedTimestamp));
+              }
+            }
+          }
+        }
+      }
+      List<Object> value = new ArrayList<>();
+      values = new ArrayList<>();
+      // after the data is constructed, the final results are generated
+      // First, judge whether there are common queries. If there are, a selector function is allowed
+      // without aggregate functions
+      if (selectComponent.isHasCommonQuery()) {
+        InfluxSelector selector = (InfluxSelector) functions.get(0);
+        List<Object> relatedValue = selector.getRelatedValues();
+        for (String column : newColumns) {
+          if (InfluxSQLConstant.getNativeSelectorFunctionNames().contains(column)) {
+            value.add(selector.calculateBruteForce().getValue());
+          } else {
+            if (relatedValue != null) {
+              value.add(relatedValue.get(columnOrders.get(column)));
+            }
+          }
+        }
+      } else {
+        // If there are no common queries, they are all function queries
+        for (InfluxFunction function : functions) {
+          if (value.size() == 0) {
+            value.add(function.calculateBruteForce().getTimestamp());
+          } else {
+            value.set(0, function.calculateBruteForce().getTimestamp());
+          }
+          value.add(function.calculateBruteForce().getValue());
+        }
+        if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
+          value.set(0, 0);
+        }
+      }
+      values.add(value);
+    }
+    // if it is not a function query, it is only a common query
+    else if (selectComponent.isHasCommonQuery()) {
+      // start traversing the scope of the select
+      for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
+        Expression expression = resultColumn.getExpression();
+        if (expression instanceof TimeSeriesOperand) {
+          // not star case
+          if (!((TimeSeriesOperand) expression)
+              .getPath()
+              .getFullPath()
+              .equals(InfluxSQLConstant.STAR)) {
+            newColumns.add(((TimeSeriesOperand) expression).getPath().getFullPath());
+          } else {
+            newColumns.addAll(columns.subList(1, columns.size()));
+          }
+        }
+      }
+      List<List<Object>> newValues = new ArrayList<>();
+      for (List<Object> value : values) {
+        List<Object> tmpValue = new ArrayList<>();
+        for (String newColumn : newColumns) {
+          tmpValue.add(value.get(columnOrders.get(newColumn)));
+        }
+        newValues.add(tmpValue);
+      }
+      values = newValues;
+    }
+    QueryResultUtils.updateQueryResultColumnValue(
+        queryResult, StringUtils.removeDuplicate(newColumns), values);
+  }
+
+  /**
+   * Query the select result. By default, there are no filter conditions. The functions to be
+   * queried use the built-in iotdb functions
+   *
+   * @param selectComponent select data to query
+   * @return select query result
+   */
+  public final QueryResult queryFuncWithoutFilter(
+      InfluxSelectComponent selectComponent,
+      String database,
+      String measurement,
+      ServiceProvider serviceProvider,
+      long sessionid) {
+    // columns
+    List<String> columns = new ArrayList<>();
+    columns.add(InfluxSQLConstant.RESERVED_TIME);
+
+    List<InfluxFunction> functions = new ArrayList<>();
+    String path = "root." + database + "." + measurement;
+    for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
+      Expression expression = resultColumn.getExpression();
+      if (expression instanceof FunctionExpression) {
+        String functionName = ((FunctionExpression) expression).getFunctionName();
+        functions.add(
+            InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
+        columns.add(functionName);
+      }
+    }
+
+    List<Object> value = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+    for (InfluxFunction function : functions) {
+      InfluxFunctionValue functionValue =
+          updateByIoTDBFunc(function, serviceProvider, path, sessionid);
+      //      InfluxFunctionValue functionValue = function.calculateByIoTDBFunc();
+      if (value.size() == 0) {
+        value.add(functionValue.getTimestamp());
+      } else {
+        value.set(0, functionValue.getTimestamp());
+      }
+      value.add(functionValue.getValue());
+    }
+    if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
+      value.set(0, 0);
+    }
+    values.add(value);
+
+    // generate series
+    QueryResult queryResult = new QueryResult();
+    QueryResult.Series series = new QueryResult.Series();
+    series.setColumns(columns);
+    series.setValues(values);
+    series.setName(measurement);
+    QueryResult.Result result = new QueryResult.Result();
+    result.setSeries(new ArrayList<>(Arrays.asList(series)));
+    queryResult.setResults(new ArrayList<>(Arrays.asList(result)));
+    return queryResult;
+  }
+
+  public QueryResult queryExpr(
+      FilterOperator operator,
+      String database,
+      String measurement,
+      ServiceProvider serviceProvider,
+      Map<String, Integer> fieldOrders,
+      Long sessionId)
+      throws AuthException {
+    if (operator == null) {
+      List<IExpression> expressions = new ArrayList<>();
+      return queryByConditions(
+          expressions, database, measurement, serviceProvider, fieldOrders, sessionId);
+    } else if (operator instanceof BasicFunctionOperator) {
+      List<IExpression> iExpressions = new ArrayList<>();
+      iExpressions.add(getIExpressionForBasicFunctionOperator((BasicFunctionOperator) operator));
+      return queryByConditions(
+          iExpressions, database, measurement, serviceProvider, fieldOrders, sessionId);
+    } else {
+      FilterOperator leftOperator = operator.getChildren().get(0);
+      FilterOperator rightOperator = operator.getChildren().get(1);
+      if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
+        return QueryResultUtils.orQueryResultProcess(
+            queryExpr(leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
+            queryExpr(
+                rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
+      } else if (operator.getFilterType() == FilterConstant.FilterType.KW_AND) {
+        if (canMergeOperator(leftOperator) && canMergeOperator(rightOperator)) {
+          List<IExpression> iExpressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
+          List<IExpression> iExpressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
+          iExpressions1.addAll(iExpressions2);
+          return queryByConditions(
+              iExpressions1, database, measurement, serviceProvider, fieldOrders, sessionId);
+        } else {
+          return QueryResultUtils.andQueryResultProcess(
+              queryExpr(
+                  leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
+              queryExpr(
+                  rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
+        }
+      }
+    }
+    throw new IllegalArgumentException("unknown operator " + operator);
+  }
+
+  /**
+   * get query results in the format of influxdb through conditions
+   *
+   * @param expressions list of conditions, including tag and field condition
+   * @return returns the results of the influxdb query
+   */
+  private QueryResult queryByConditions(
+      List<IExpression> expressions,
+      String database,
+      String measurement,
+      ServiceProvider serviceProvider,
+      Map<String, Integer> fieldOrders,
+      Long sessionId)
+      throws AuthException {
+    // used to store the actual order according to the tag
+    Map<Integer, SingleSeriesExpression> realTagOrders = new HashMap<>();
+    // stores a list of conditions belonging to the field
+    List<SingleSeriesExpression> fieldExpressions = new ArrayList<>();
+    // maximum number of tags in the current query criteria
+    int currentQueryMaxTagNum = 0;
+    Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
+    for (IExpression expression : expressions) {
+      SingleSeriesExpression singleSeriesExpression = ((SingleSeriesExpression) expression);
+      // the current condition is in tag
+      if (tagOrders.containsKey(singleSeriesExpression.getSeriesPath().getFullPath())) {
+        int curOrder = tagOrders.get(singleSeriesExpression.getSeriesPath().getFullPath());
+        // put it into the map according to the tag
+        realTagOrders.put(curOrder, singleSeriesExpression);
+        // update the maximum tag order of the current query criteria
+        currentQueryMaxTagNum = Math.max(currentQueryMaxTagNum, curOrder);
+      } else {
+        fieldExpressions.add(singleSeriesExpression);
+      }
+    }
+    // construct the actual query path
+    StringBuilder curQueryPath = new StringBuilder("root." + database + "." + measurement);
+    // the maximum number of traversals from 1 to the current query condition
+    for (int i = 1; i <= currentQueryMaxTagNum; i++) {
+      if (realTagOrders.containsKey(i)) {
+        // since it is the value in the path, you need to remove the quotation marks at the
+        // beginning and end
+        curQueryPath
+            .append(".")
+            .append(
+                StringUtils.removeQuotation(
+                    FilterUtils.getFilterStringValue(realTagOrders.get(i).getFilter())));
+      } else {
+        curQueryPath.append(".").append("*");
+      }
+    }
+    if (currentQueryMaxTagNum < tagOrders.size()) {
+      curQueryPath.append(".**");
+    }
+    // construct actual query condition
+    StringBuilder realIotDBCondition = new StringBuilder();
+    for (int i = 0; i < fieldExpressions.size(); i++) {
+      SingleSeriesExpression singleSeriesExpression = fieldExpressions.get(i);
+      if (i != 0) {
+        realIotDBCondition.append(" and ");
+      }
+      realIotDBCondition
+          .append(singleSeriesExpression.getSeriesPath().getFullPath())
+          .append(" ")
+          .append((FilterUtils.getFilerSymbol(singleSeriesExpression.getFilter())))
+          .append(" ")
+          .append(FilterUtils.getFilterStringValue(singleSeriesExpression.getFilter()));
+    }
+    // actual query SQL statement
+    String realQuerySql;
+
+    realQuerySql = "select * from " + curQueryPath;
+    if (!(realIotDBCondition.length() == 0)) {
+      realQuerySql += " where " + realIotDBCondition;
+    }
+    realQuerySql += " align by device";
+    return queryByConditions(
+        realQuerySql, database, measurement, serviceProvider, fieldOrders, sessionId);
+  }
+
+  /**
+   * generate query conditions through the syntax tree (if you enter this function, it means that it
+   * must be a syntax tree that can be merged, and there is no or)
+   *
+   * @param filterOperator the syntax tree of query criteria needs to be generated
+   * @return condition list
+   */
+  public List<IExpression> getIExpressionByFilterOperatorOperator(FilterOperator filterOperator) {
+    if (filterOperator instanceof BasicFunctionOperator) {
+      // It must be a non-or situation
+      List<IExpression> expressions = new ArrayList<>();
+      expressions.add(
+          getIExpressionForBasicFunctionOperator((BasicFunctionOperator) filterOperator));
+      return expressions;
+    } else {
+      FilterOperator leftOperator = filterOperator.getChildren().get(0);
+      FilterOperator rightOperator = filterOperator.getChildren().get(1);
+      List<IExpression> expressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
+      List<IExpression> expressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
+      expressions1.addAll(expressions2);
+      return expressions1;
+    }
+  }
+
+  /**
+   * judge whether the subtrees of the syntax tree have or operations. If not, the query can be
+   * merged
+   *
+   * @param operator subtree to judge
+   * @return can merge queries
+   */
+  public boolean canMergeOperator(FilterOperator operator) {
+    if (operator instanceof BasicFunctionOperator) {
+      return true;
+    } else {
+      if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
+        return false;
+      } else {
+        FilterOperator leftOperator = operator.getChildren().get(0);
+        FilterOperator rightOperator = operator.getChildren().get(1);
+        return canMergeOperator(leftOperator) && canMergeOperator(rightOperator);
+      }
+    }
+  }
+
+  public void checkInfluxDBQueryOperator(Operator operator) {
+    if (!(operator instanceof InfluxQueryOperator)) {
+      throw new IllegalArgumentException("not query sql");
+    }
+    InfluxSelectComponent selectComponent = ((InfluxQueryOperator) operator).getSelectComponent();
+    if (selectComponent.isHasMoreSelectorFunction() && selectComponent.isHasCommonQuery()) {
+      throw new IllegalArgumentException(
+          "ERR: mixing multiple selector functions with tags or fields is not supported");
+    }
+    if (selectComponent.isHasAggregationFunction() && selectComponent.isHasCommonQuery()) {
+      throw new IllegalArgumentException(
+          "ERR: mixing aggregate and non-aggregate queries is not supported");
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java
new file mode 100644
index 0000000000..ee8d0db5c0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.handler;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
+import org.apache.iotdb.db.protocol.influxdb.meta.NewInfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
+import org.apache.iotdb.db.protocol.influxdb.util.StringUtils;
+import org.apache.iotdb.db.service.basic.ServiceProvider;
+import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+
+import org.influxdb.InfluxDBException;
+import org.influxdb.dto.QueryResult;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NewQueryHandler extends AbstractQueryHandler {
+
+  public static TSExecuteStatementResp executeStatement(String sql, long sessionId) {
+    TSExecuteStatementReq tsExecuteStatementReq = new TSExecuteStatementReq();
+    tsExecuteStatementReq.setStatement(sql);
+    tsExecuteStatementReq.setSessionId(sessionId);
+    tsExecuteStatementReq.setStatementId(
+        NewInfluxDBServiceImpl.getClientRPCService().requestStatementId(sessionId));
+    tsExecuteStatementReq.setFetchSize(InfluxConstant.DEFAULT_FETCH_SIZE);
+    TSExecuteStatementResp executeStatementResp =
+        NewInfluxDBServiceImpl.getClientRPCService().executeStatement(tsExecuteStatementReq);
+    TSStatus tsStatus = executeStatementResp.getStatus();
+    if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new InfluxDBException(tsStatus.getMessage());
+    }
+    return executeStatementResp;
+  }
+
+  @Override
+  public Map<String, Integer> getFieldOrders(
+      String database, String measurement, ServiceProvider serviceProvider, long sessionID) {
+    Map<String, Integer> fieldOrders = new HashMap<>();
+    String showTimeseriesSql = "show timeseries root." + database + '.' + measurement + ".**";
+    TSExecuteStatementResp executeStatementResp = executeStatement(showTimeseriesSql, sessionID);
+    List<String> paths = QueryResultUtils.getFullPaths(executeStatementResp);
+    Map<String, Integer> tagOrders = NewInfluxDBMetaManager.getTagOrders(database, measurement);
+    int tagOrderNums = tagOrders.size();
+    int fieldNums = 0;
+    for (String path : paths) {
+      String filed = StringUtils.getFieldByPath(path);
+      if (!fieldOrders.containsKey(filed)) {
+        // The corresponding order of fields is 1 + tagNum (the first is timestamp, then all tags,
+        // and finally all fields)
+        fieldOrders.put(filed, tagOrderNums + fieldNums + 1);
+        fieldNums++;
+      }
+    }
+    return fieldOrders;
+  }
+
+  @Override
+  public InfluxFunctionValue updateByIoTDBFunc(
+      InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid) {
+    switch (function.getFunctionName()) {
+      case InfluxSQLConstant.COUNT:
+        {
+          String functionSql =
+              StringUtils.generateFunctionSql(
+                  function.getFunctionName(), function.getParmaName(), path);
+          TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+          List<InfluxFunctionValue> list =
+              QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+          for (InfluxFunctionValue influxFunctionValue : list) {
+            function.updateValueIoTDBFunc(influxFunctionValue);
+          }
+          break;
+        }
+      case InfluxSQLConstant.MEAN:
+        {
+          String functionSqlCount =
+              StringUtils.generateFunctionSql("count", function.getParmaName(), path);
+          TSExecuteStatementResp tsExecuteStatementResp =
+              executeStatement(functionSqlCount, sessionid);
+          List<InfluxFunctionValue> list =
+              QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+          for (InfluxFunctionValue influxFunctionValue : list) {
+            function.updateValueIoTDBFunc(influxFunctionValue);
+          }
+          String functionSqlSum =
+              StringUtils.generateFunctionSql("sum", function.getParmaName(), path);
+          tsExecuteStatementResp = executeStatement(functionSqlSum, sessionid);
+          list = QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+          for (InfluxFunctionValue influxFunctionValue : list) {
+            function.updateValueIoTDBFunc(null, influxFunctionValue);
+          }
+          break;
+        }
+      case InfluxSQLConstant.SUM:
+        {
+          String functionSql =
+              StringUtils.generateFunctionSql("sum", function.getParmaName(), path);
+          TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+          List<InfluxFunctionValue> list =
+              QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+          for (InfluxFunctionValue influxFunctionValue : list) {
+            function.updateValueIoTDBFunc(influxFunctionValue);
+          }
+          break;
+        }
+      case InfluxSQLConstant.FIRST:
+      case InfluxSQLConstant.LAST:
+        {
+          String functionSql;
+          String functionName;
+          if (function.getFunctionName().equals(InfluxSQLConstant.FIRST)) {
+            functionSql =
+                StringUtils.generateFunctionSql("first_value", function.getParmaName(), path);
+            functionName = "first_value";
+          } else {
+            functionSql =
+                StringUtils.generateFunctionSql("last_value", function.getParmaName(), path);
+            functionName = "last_value";
+          }
+          TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+          Map<String, Object> map = QueryResultUtils.getColumnNameAndValue(tsExecuteStatementResp);
+          for (String colume : map.keySet()) {
+            Object o = map.get(colume);
+            String fullPath = colume.substring(functionName.length() + 1, colume.length() - 1);
+            String devicePath = StringUtils.getDeviceByPath(fullPath);
+            String specificSql =
+                String.format(
+                    "select %s from %s where %s=%s",
+                    function.getParmaName(), devicePath, fullPath, o);
+            TSExecuteStatementResp resp = executeStatement(specificSql, sessionid);
+            List<InfluxFunctionValue> list = QueryResultUtils.getInfluxFunctionValues(resp);
+            for (InfluxFunctionValue influxFunctionValue : list) {
+              function.updateValueIoTDBFunc(influxFunctionValue);
+            }
+          }
+          break;
+        }
+      case InfluxSQLConstant.MAX:
+      case InfluxSQLConstant.MIN:
+        {
+          String functionSql;
+          if (function.getFunctionName().equals(InfluxSQLConstant.MAX)) {
+            functionSql =
+                StringUtils.generateFunctionSql("max_value", function.getParmaName(), path);
+          } else {
+            functionSql =
+                StringUtils.generateFunctionSql("min_value", function.getParmaName(), path);
+          }
+          TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid);
+          List<InfluxFunctionValue> list =
+              QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp);
+          for (InfluxFunctionValue influxFunctionValue : list) {
+            function.updateValueIoTDBFunc(influxFunctionValue);
+          }
+          break;
+        }
+      default:
+        throw new IllegalStateException("Unexpected value: " + function.getFunctionName());
+    }
+    return function.calculateByIoTDBFunc();
+  }
+
+  @Override
+  public QueryResult queryByConditions(
+      String querySql,
+      String database,
+      String measurement,
+      ServiceProvider serviceProvider,
+      Map<String, Integer> fieldOrders,
+      long sessionId) {
+    TSExecuteStatementResp executeStatementResp = executeStatement(querySql, sessionId);
+    return QueryResultUtils.iotdbResultConvertInfluxResult(
+        executeStatementResp, database, measurement, fieldOrders);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
index 113da3912f..b58b65f6bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
@@ -23,43 +23,23 @@ import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.ResultColumn;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
 import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant;
 import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction;
-import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionFactory;
 import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
-import org.apache.iotdb.db.protocol.influxdb.function.aggregator.InfluxAggregator;
-import org.apache.iotdb.db.protocol.influxdb.function.selector.InfluxSelector;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
-import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
-import org.apache.iotdb.db.protocol.influxdb.operator.InfluxSelectComponent;
 import org.apache.iotdb.db.protocol.influxdb.util.FieldUtils;
-import org.apache.iotdb.db.protocol.influxdb.util.FilterUtils;
-import org.apache.iotdb.db.protocol.influxdb.util.JacksonUtils;
 import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
 import org.apache.iotdb.db.protocol.influxdb.util.StringUtils;
-import org.apache.iotdb.db.qp.constant.FilterConstant;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
-import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryResultRsp;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.apache.thrift.TException;
@@ -69,205 +49,15 @@ import org.influxdb.dto.QueryResult;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class QueryHandler {
+public class QueryHandler extends AbstractQueryHandler {
 
-  public static InfluxQueryResultRsp queryInfluxDB(
-      String database,
-      InfluxQueryOperator queryOperator,
-      long sessionId,
-      ServiceProvider serviceProvider) {
-    String measurement = queryOperator.getFromComponent().getPrefixPaths().get(0).getFullPath();
-    // The list of fields under the current measurement and the order of the specified rules
-    Map<String, Integer> fieldOrders = getFieldOrders(database, measurement, serviceProvider);
-    QueryResult queryResult;
-    InfluxQueryResultRsp tsQueryResultRsp = new InfluxQueryResultRsp();
-    try {
-      // contain filter condition or have common query the result of by traversal.
-      if (queryOperator.getWhereComponent() != null
-          || queryOperator.getSelectComponent().isHasCommonQuery()
-          || queryOperator.getSelectComponent().isHasOnlyTraverseFunction()) {
-        // step1 : generate query results
-        queryResult =
-            queryExpr(
-                queryOperator.getWhereComponent() != null
-                    ? queryOperator.getWhereComponent().getFilterOperator()
-                    : null,
-                database,
-                measurement,
-                serviceProvider,
-                fieldOrders,
-                sessionId);
-        // step2 : select filter
-        ProcessSelectComponent(queryResult, queryOperator.getSelectComponent());
-      }
-      // don't contain filter condition and only have function use iotdb function.
-      else {
-        queryResult =
-            queryFuncWithoutFilter(
-                queryOperator.getSelectComponent(), database, measurement, serviceProvider);
-      }
-      return tsQueryResultRsp
-          .setResultJsonString(JacksonUtils.bean2Json(queryResult))
-          .setStatus(RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
-    } catch (AuthException e) {
-      return tsQueryResultRsp.setStatus(
-          RpcUtils.getInfluxDBStatus(
-              TSStatusCode.UNINITIALIZED_AUTH_ERROR.getStatusCode(), e.getMessage()));
-    }
-  }
-
-  /**
-   * conditions are generated from subtrees of unique conditions
-   *
-   * @param basicFunctionOperator subtree to generate condition
-   * @return corresponding conditions
-   */
-  public static IExpression getIExpressionForBasicFunctionOperator(
-      BasicFunctionOperator basicFunctionOperator) {
-    return new SingleSeriesExpression(
-        basicFunctionOperator.getSinglePath(),
-        FilterUtils.filterTypeToFilter(
-            basicFunctionOperator.getFilterType(), basicFunctionOperator.getValue()));
-  }
-
-  /**
-   * further process the obtained query result through the query criteria of select
-   *
-   * @param queryResult query results to be processed
-   * @param selectComponent select conditions to be filtered
-   */
-  public static void ProcessSelectComponent(
-      QueryResult queryResult, InfluxSelectComponent selectComponent) {
-
-    // get the row order map of the current data result first
-    List<String> columns = queryResult.getResults().get(0).getSeries().get(0).getColumns();
-    Map<String, Integer> columnOrders = new HashMap<>();
-    for (int i = 0; i < columns.size(); i++) {
-      columnOrders.put(columns.get(i), i);
-    }
-    // get current values
-    List<List<Object>> values = queryResult.getResults().get(0).getSeries().get(0).getValues();
-    // new columns
-    List<String> newColumns = new ArrayList<>();
-    newColumns.add(InfluxSQLConstant.RESERVED_TIME);
-
-    // when have function
-    if (selectComponent.isHasFunction()) {
-      List<InfluxFunction> functions = new ArrayList<>();
-      for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
-        Expression expression = resultColumn.getExpression();
-        if (expression instanceof FunctionExpression) {
-          String functionName = ((FunctionExpression) expression).getFunctionName();
-          functions.add(
-              InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
-          newColumns.add(functionName);
-        } else if (expression instanceof TimeSeriesOperand) {
-          String columnName = ((TimeSeriesOperand) expression).getPath().getFullPath();
-          if (!columnName.equals(InfluxSQLConstant.STAR)) {
-            newColumns.add(columnName);
-          } else {
-            newColumns.addAll(columns.subList(1, columns.size()));
-          }
-        }
-      }
-      for (List<Object> value : values) {
-        for (InfluxFunction function : functions) {
-          List<Expression> expressions = function.getExpressions();
-          if (expressions == null) {
-            throw new IllegalArgumentException("not support param");
-          }
-          TimeSeriesOperand parmaExpression = (TimeSeriesOperand) expressions.get(0);
-          String parmaName = parmaExpression.getPath().getFullPath();
-          if (columnOrders.containsKey(parmaName)) {
-            Object selectedValue = value.get(columnOrders.get(parmaName));
-            Long selectedTimestamp = (Long) value.get(0);
-            if (selectedValue != null) {
-              // selector function
-              if (function instanceof InfluxSelector) {
-                ((InfluxSelector) function)
-                    .updateValueAndRelateValues(
-                        new InfluxFunctionValue(selectedValue, selectedTimestamp), value);
-              } else {
-                // aggregate function
-                ((InfluxAggregator) function)
-                    .updateValueBruteForce(
-                        new InfluxFunctionValue(selectedValue, selectedTimestamp));
-              }
-            }
-          }
-        }
-      }
-      List<Object> value = new ArrayList<>();
-      values = new ArrayList<>();
-      // after the data is constructed, the final results are generated
-      // First, judge whether there are common queries. If there are, a selector function is allowed
-      // without aggregate functions
-      if (selectComponent.isHasCommonQuery()) {
-        InfluxSelector selector = (InfluxSelector) functions.get(0);
-        List<Object> relatedValue = selector.getRelatedValues();
-        for (String column : newColumns) {
-          if (InfluxSQLConstant.getNativeSelectorFunctionNames().contains(column)) {
-            value.add(selector.calculateBruteForce().getValue());
-          } else {
-            if (relatedValue != null) {
-              value.add(relatedValue.get(columnOrders.get(column)));
-            }
-          }
-        }
-      } else {
-        // If there are no common queries, they are all function queries
-        for (InfluxFunction function : functions) {
-          if (value.size() == 0) {
-            value.add(function.calculateBruteForce().getTimestamp());
-          } else {
-            value.set(0, function.calculateBruteForce().getTimestamp());
-          }
-          value.add(function.calculateBruteForce().getValue());
-        }
-        if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
-          value.set(0, 0);
-        }
-      }
-      values.add(value);
-    }
-    // if it is not a function query, it is only a common query
-    else if (selectComponent.isHasCommonQuery()) {
-      // start traversing the scope of the select
-      for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
-        Expression expression = resultColumn.getExpression();
-        if (expression instanceof TimeSeriesOperand) {
-          // not star case
-          if (!((TimeSeriesOperand) expression)
-              .getPath()
-              .getFullPath()
-              .equals(InfluxSQLConstant.STAR)) {
-            newColumns.add(((TimeSeriesOperand) expression).getPath().getFullPath());
-          } else {
-            newColumns.addAll(columns.subList(1, columns.size()));
-          }
-        }
-      }
-      List<List<Object>> newValues = new ArrayList<>();
-      for (List<Object> value : values) {
-        List<Object> tmpValue = new ArrayList<>();
-        for (String newColumn : newColumns) {
-          tmpValue.add(value.get(columnOrders.get(newColumn)));
-        }
-        newValues.add(tmpValue);
-      }
-      values = newValues;
-    }
-    QueryResultUtils.updateQueryResultColumnValue(
-        queryResult, StringUtils.removeDuplicate(newColumns), values);
-  }
-
-  public static Map<String, Integer> getFieldOrders(
-      String database, String measurement, ServiceProvider serviceProvider) {
+  @Override
+  public Map<String, Integer> getFieldOrders(
+      String database, String measurement, ServiceProvider serviceProvider, long sessionID) {
     Map<String, Integer> fieldOrders = new HashMap<>();
     long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true);
     try {
@@ -312,65 +102,9 @@ public class QueryHandler {
     return fieldOrders;
   }
 
-  /**
-   * Query the select result. By default, there are no filter conditions. The functions to be
-   * queried use the built-in iotdb functions
-   *
-   * @param selectComponent select data to query
-   * @return select query result
-   */
-  public static QueryResult queryFuncWithoutFilter(
-      InfluxSelectComponent selectComponent,
-      String database,
-      String measurement,
-      ServiceProvider serviceProvider) {
-    // columns
-    List<String> columns = new ArrayList<>();
-    columns.add(InfluxSQLConstant.RESERVED_TIME);
-
-    List<InfluxFunction> functions = new ArrayList<>();
-    String path = "root." + database + "." + measurement;
-    for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
-      Expression expression = resultColumn.getExpression();
-      if (expression instanceof FunctionExpression) {
-        String functionName = ((FunctionExpression) expression).getFunctionName();
-        functions.add(
-            InfluxFunctionFactory.generateFunction(functionName, expression.getExpressions()));
-        columns.add(functionName);
-      }
-    }
-
-    List<Object> value = new ArrayList<>();
-    List<List<Object>> values = new ArrayList<>();
-    for (InfluxFunction function : functions) {
-      InfluxFunctionValue functionValue = updateByIoTDBFunc(function, serviceProvider, path);
-      //      InfluxFunctionValue functionValue = function.calculateByIoTDBFunc();
-      if (value.size() == 0) {
-        value.add(functionValue.getTimestamp());
-      } else {
-        value.set(0, functionValue.getTimestamp());
-      }
-      value.add(functionValue.getValue());
-    }
-    if (selectComponent.isHasAggregationFunction() || selectComponent.isHasMoreFunction()) {
-      value.set(0, 0);
-    }
-    values.add(value);
-
-    // generate series
-    QueryResult queryResult = new QueryResult();
-    QueryResult.Series series = new QueryResult.Series();
-    series.setColumns(columns);
-    series.setValues(values);
-    series.setName(measurement);
-    QueryResult.Result result = new QueryResult.Result();
-    result.setSeries(new ArrayList<>(Arrays.asList(series)));
-    queryResult.setResults(new ArrayList<>(Arrays.asList(result)));
-    return queryResult;
-  }
-
-  private static InfluxFunctionValue updateByIoTDBFunc(
-      InfluxFunction function, ServiceProvider serviceProvider, String path) {
+  @Override
+  public InfluxFunctionValue updateByIoTDBFunc(
+      InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid) {
     switch (function.getFunctionName()) {
       case InfluxSQLConstant.COUNT:
         {
@@ -666,7 +400,9 @@ public class QueryHandler {
                     RowRecord recordNew = queryDataSetNew.next();
                     List<Field> newFields = recordNew.getFields();
                     long time = recordNew.getTimestamp();
-                    function.updateValueIoTDBFunc(new InfluxFunctionValue(newFields.get(0), time));
+                    function.updateValueIoTDBFunc(
+                        new InfluxFunctionValue(
+                            FieldUtils.iotdbFieldConvert(newFields.get(0)), time));
                   }
                 }
               }
@@ -740,145 +476,19 @@ public class QueryHandler {
     return function.calculateByIoTDBFunc();
   }
 
-  public static void checkInfluxDBQueryOperator(Operator operator) {
-    if (!(operator instanceof InfluxQueryOperator)) {
-      throw new IllegalArgumentException("not query sql");
-    }
-    InfluxSelectComponent selectComponent = ((InfluxQueryOperator) operator).getSelectComponent();
-    if (selectComponent.isHasMoreSelectorFunction() && selectComponent.isHasCommonQuery()) {
-      throw new IllegalArgumentException(
-          "ERR: mixing multiple selector functions with tags or fields is not supported");
-    }
-    if (selectComponent.isHasAggregationFunction() && selectComponent.isHasCommonQuery()) {
-      throw new IllegalArgumentException(
-          "ERR: mixing aggregate and non-aggregate queries is not supported");
-    }
-  }
-
-  public static QueryResult queryExpr(
-      FilterOperator operator,
-      String database,
-      String measurement,
-      ServiceProvider serviceProvider,
-      Map<String, Integer> fieldOrders,
-      Long sessionId)
-      throws AuthException {
-    if (operator == null) {
-      List<IExpression> expressions = new ArrayList<>();
-      return queryByConditions(
-          expressions, database, measurement, serviceProvider, fieldOrders, sessionId);
-    } else if (operator instanceof BasicFunctionOperator) {
-      List<IExpression> iExpressions = new ArrayList<>();
-      iExpressions.add(getIExpressionForBasicFunctionOperator((BasicFunctionOperator) operator));
-      return queryByConditions(
-          iExpressions, database, measurement, serviceProvider, fieldOrders, sessionId);
-    } else {
-      FilterOperator leftOperator = operator.getChildren().get(0);
-      FilterOperator rightOperator = operator.getChildren().get(1);
-      if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
-        return QueryResultUtils.orQueryResultProcess(
-            queryExpr(leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
-            queryExpr(
-                rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
-      } else if (operator.getFilterType() == FilterConstant.FilterType.KW_AND) {
-        if (canMergeOperator(leftOperator) && canMergeOperator(rightOperator)) {
-          List<IExpression> iExpressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
-          List<IExpression> iExpressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
-          iExpressions1.addAll(iExpressions2);
-          return queryByConditions(
-              iExpressions1, database, measurement, serviceProvider, fieldOrders, sessionId);
-        } else {
-          return QueryResultUtils.andQueryResultProcess(
-              queryExpr(
-                  leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId),
-              queryExpr(
-                  rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId));
-        }
-      }
-    }
-    throw new IllegalArgumentException("unknown operator " + operator);
-  }
-
-  /**
-   * get query results in the format of influxdb through conditions
-   *
-   * @param expressions list of conditions, including tag and field condition
-   * @return returns the results of the influxdb query
-   */
-  private static QueryResult queryByConditions(
-      List<IExpression> expressions,
+  @Override
+  public QueryResult queryByConditions(
+      String querySql,
       String database,
       String measurement,
       ServiceProvider serviceProvider,
       Map<String, Integer> fieldOrders,
-      Long sessionId)
+      long sessionId)
       throws AuthException {
-    // used to store the actual order according to the tag
-    Map<Integer, SingleSeriesExpression> realTagOrders = new HashMap<>();
-    // stores a list of conditions belonging to the field
-    List<SingleSeriesExpression> fieldExpressions = new ArrayList<>();
-    // maximum number of tags in the current query criteria
-    int currentQueryMaxTagNum = 0;
-    Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
-    for (IExpression expression : expressions) {
-      SingleSeriesExpression singleSeriesExpression = ((SingleSeriesExpression) expression);
-      // the current condition is in tag
-      if (tagOrders.containsKey(singleSeriesExpression.getSeriesPath().getFullPath())) {
-        int curOrder = tagOrders.get(singleSeriesExpression.getSeriesPath().getFullPath());
-        // put it into the map according to the tag
-        realTagOrders.put(curOrder, singleSeriesExpression);
-        // update the maximum tag order of the current query criteria
-        currentQueryMaxTagNum = Math.max(currentQueryMaxTagNum, curOrder);
-      } else {
-        fieldExpressions.add(singleSeriesExpression);
-      }
-    }
-    // construct the actual query path
-    StringBuilder curQueryPath = new StringBuilder("root." + database + "." + measurement);
-    // the maximum number of traversals from 1 to the current query condition
-    for (int i = 1; i <= currentQueryMaxTagNum; i++) {
-      if (realTagOrders.containsKey(i)) {
-        // since it is the value in the path, you need to remove the quotation marks at the
-        // beginning and end
-        curQueryPath
-            .append(".")
-            .append(
-                StringUtils.removeQuotation(
-                    FilterUtils.getFilterStringValue(realTagOrders.get(i).getFilter())));
-      } else {
-        curQueryPath.append(".").append("*");
-      }
-    }
-    if (currentQueryMaxTagNum < tagOrders.size()) {
-      curQueryPath.append(".**");
-    }
-    // construct actual query condition
-    StringBuilder realIotDBCondition = new StringBuilder();
-    for (int i = 0; i < fieldExpressions.size(); i++) {
-      SingleSeriesExpression singleSeriesExpression = fieldExpressions.get(i);
-      if (i != 0) {
-        realIotDBCondition.append(" and ");
-      }
-      realIotDBCondition
-          .append(singleSeriesExpression.getSeriesPath().getFullPath())
-          .append(" ")
-          .append((FilterUtils.getFilerSymbol(singleSeriesExpression.getFilter())))
-          .append(" ")
-          .append(FilterUtils.getFilterStringValue(singleSeriesExpression.getFilter()));
-    }
-    // actual query SQL statement
-    String realQuerySql;
-
-    realQuerySql = "select * from " + curQueryPath;
-    if (!(realIotDBCondition.length() == 0)) {
-      realQuerySql += " where " + realIotDBCondition;
-    }
-    realQuerySql += " align by device";
-
     long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true);
     try {
       QueryPlan queryPlan =
-          (QueryPlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(realQuerySql);
+          (QueryPlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(querySql);
       TSStatus tsStatus = SessionManager.getInstance().checkAuthority(queryPlan, sessionId);
       if (tsStatus != null) {
         throw new AuthException(tsStatus.getMessage());
@@ -888,7 +498,7 @@ public class QueryHandler {
               queryId,
               true,
               System.currentTimeMillis(),
-              realQuerySql,
+              querySql,
               InfluxConstant.DEFAULT_CONNECTION_TIMEOUT_MS);
       QueryDataSet queryDataSet =
           serviceProvider.createQueryDataSet(
@@ -908,50 +518,4 @@ public class QueryHandler {
       ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
     }
   }
-
-  /**
-   * generate query conditions through the syntax tree (if you enter this function, it means that it
-   * must be a syntax tree that can be merged, and there is no or)
-   *
-   * @param filterOperator the syntax tree of query criteria needs to be generated
-   * @return condition list
-   */
-  public static List<IExpression> getIExpressionByFilterOperatorOperator(
-      FilterOperator filterOperator) {
-    if (filterOperator instanceof BasicFunctionOperator) {
-      // It must be a non-or situation
-      List<IExpression> expressions = new ArrayList<>();
-      expressions.add(
-          getIExpressionForBasicFunctionOperator((BasicFunctionOperator) filterOperator));
-      return expressions;
-    } else {
-      FilterOperator leftOperator = filterOperator.getChildren().get(0);
-      FilterOperator rightOperator = filterOperator.getChildren().get(1);
-      List<IExpression> expressions1 = getIExpressionByFilterOperatorOperator(leftOperator);
-      List<IExpression> expressions2 = getIExpressionByFilterOperatorOperator(rightOperator);
-      expressions1.addAll(expressions2);
-      return expressions1;
-    }
-  }
-
-  /**
-   * judge whether the subtrees of the syntax tree have or operations. If not, the query can be
-   * merged
-   *
-   * @param operator subtree to judge
-   * @return can merge queries
-   */
-  public static boolean canMergeOperator(FilterOperator operator) {
-    if (operator instanceof BasicFunctionOperator) {
-      return true;
-    } else {
-      if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) {
-        return false;
-      } else {
-        FilterOperator leftOperator = operator.getChildren().get(0);
-        FilterOperator rightOperator = operator.getChildren().get(1);
-        return canMergeOperator(leftOperator) && canMergeOperator(rightOperator);
-      }
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java
new file mode 100644
index 0000000000..513b06e59f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.meta;
+
+import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractInfluxDBMetaManager {
+
+  protected static final String SELECT_TAG_INFO_SQL =
+      "select database_name,measurement_name,tag_name,tag_order from root.TAG_INFO ";
+
+  // TODO avoid OOM
+  protected static Map<String, Map<String, Map<String, Integer>>> database2Measurement2TagOrders =
+      new HashMap<>();
+
+  public static Map<String, Integer> getTagOrders(String database, String measurement) {
+    Map<String, Integer> tagOrders = new HashMap<>();
+    Map<String, Map<String, Integer>> measurement2TagOrders =
+        database2Measurement2TagOrders.get(database);
+    if (measurement2TagOrders != null) {
+      tagOrders = measurement2TagOrders.get(measurement);
+    }
+    if (tagOrders == null) {
+      tagOrders = new HashMap<>();
+    }
+    return tagOrders;
+  }
+
+  abstract void recover();
+
+  abstract void setStorageGroup(String database, long sessionID);
+
+  abstract void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID);
+
+  public final synchronized Map<String, Map<String, Integer>> createDatabase(
+      String database, long sessionID) {
+    Map<String, Map<String, Integer>> measurement2TagOrders =
+        database2Measurement2TagOrders.get(database);
+    if (measurement2TagOrders != null) {
+      return measurement2TagOrders;
+    }
+    setStorageGroup(database, sessionID);
+    measurement2TagOrders = new HashMap<>();
+    database2Measurement2TagOrders.put(database, measurement2TagOrders);
+    return measurement2TagOrders;
+  }
+
+  public final synchronized Map<String, Integer> getTagOrdersWithAutoCreatingSchema(
+      String database, String measurement, long sessionID) {
+    return createDatabase(database, sessionID).computeIfAbsent(measurement, m -> new HashMap<>());
+  }
+
+  public final synchronized String generatePath(
+      String database, String measurement, Map<String, String> tags, long sessionID) {
+    Map<String, Integer> tagKeyToLayerOrders =
+        getTagOrdersWithAutoCreatingSchema(database, measurement, sessionID);
+    // to support rollback if fails to persisting new tag info
+    Map<String, Integer> newTagKeyToLayerOrders = new HashMap<>(tagKeyToLayerOrders);
+    // record the layer orders of tag keys that the path contains
+    Map<Integer, String> layerOrderToTagKeysInPath = new HashMap<>();
+
+    int tagNumber = tagKeyToLayerOrders.size();
+
+    TagInfoRecords newTagInfoRecords = null;
+    for (Map.Entry<String, String> tag : tags.entrySet()) {
+      final String tagKey = tag.getKey();
+      if (!newTagKeyToLayerOrders.containsKey(tagKey)) {
+        if (newTagInfoRecords == null) {
+          newTagInfoRecords = new TagInfoRecords();
+        }
+        ++tagNumber;
+        newTagInfoRecords.add(database, measurement, tagKey, tagNumber);
+        newTagKeyToLayerOrders.put(tagKey, tagNumber);
+      }
+
+      layerOrderToTagKeysInPath.put(newTagKeyToLayerOrders.get(tagKey), tagKey);
+    }
+
+    if (newTagInfoRecords != null) {
+      updateTagInfoRecords(newTagInfoRecords, sessionID);
+      database2Measurement2TagOrders.get(database).put(measurement, newTagKeyToLayerOrders);
+    }
+
+    StringBuilder path =
+        new StringBuilder("root.").append(database).append(".").append(measurement);
+    for (int i = 1; i <= tagNumber; ++i) {
+      path.append(".")
+          .append(
+              layerOrderToTagKeysInPath.containsKey(i)
+                  ? tags.get(layerOrderToTagKeysInPath.get(i))
+                  : InfluxConstant.PLACE_HOLDER);
+    }
+    return path.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
index 0dd094f9da..f2e58de977 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
 import org.apache.iotdb.db.qp.Planner;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -47,29 +46,23 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class InfluxDBMetaManager {
+public class InfluxDBMetaManager extends AbstractInfluxDBMetaManager {
 
   protected final Planner planner;
 
   private final ServiceProvider serviceProvider;
 
-  private static final String SELECT_TAG_INFO_SQL =
-      "select database_name,measurement_name,tag_name,tag_order from root.TAG_INFO ";
-
-  public static InfluxDBMetaManager getInstance() {
-    return InfluxDBMetaManagerHolder.INSTANCE;
-  }
-
-  // TODO avoid OOM
-  private static Map<String, Map<String, Map<String, Integer>>> database2Measurement2TagOrders =
-      new HashMap<>();
-
   private InfluxDBMetaManager() {
     serviceProvider = IoTDB.serviceProvider;
     database2Measurement2TagOrders = new HashMap<>();
     planner = serviceProvider.getPlanner();
   }
 
+  public static InfluxDBMetaManager getInstance() {
+    return InfluxDBMetaManagerHolder.INSTANCE;
+  }
+
+  @Override
   public void recover() {
     long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true);
     try {
@@ -121,13 +114,8 @@ public class InfluxDBMetaManager {
     }
   }
 
-  public synchronized Map<String, Map<String, Integer>> createDatabase(String database) {
-    Map<String, Map<String, Integer>> measurement2TagOrders =
-        database2Measurement2TagOrders.get(database);
-    if (measurement2TagOrders != null) {
-      return measurement2TagOrders;
-    }
-
+  @Override
+  public void setStorageGroup(String database, long sessionID) {
     try {
       SetStorageGroupPlan setStorageGroupPlan =
           new SetStorageGroupPlan(new PartialPath("root." + database));
@@ -140,61 +128,10 @@ public class InfluxDBMetaManager {
     } catch (IllegalPathException | StorageGroupNotSetException | StorageEngineException e) {
       throw new InfluxDBException(e.getMessage());
     }
-
-    measurement2TagOrders = new HashMap<>();
-    database2Measurement2TagOrders.put(database, measurement2TagOrders);
-    return measurement2TagOrders;
-  }
-
-  public synchronized Map<String, Integer> getTagOrdersWithAutoCreatingSchema(
-      String database, String measurement) {
-    return createDatabase(database).computeIfAbsent(measurement, m -> new HashMap<>());
   }
 
-  public synchronized String generatePath(
-      String database, String measurement, Map<String, String> tags) {
-    Map<String, Integer> tagKeyToLayerOrders =
-        getTagOrdersWithAutoCreatingSchema(database, measurement);
-    // to support rollback if fails to persisting new tag info
-    Map<String, Integer> newTagKeyToLayerOrders = new HashMap<>(tagKeyToLayerOrders);
-    // record the layer orders of tag keys that the path contains
-    Map<Integer, String> layerOrderToTagKeysInPath = new HashMap<>();
-
-    int tagNumber = tagKeyToLayerOrders.size();
-
-    TagInfoRecords newTagInfoRecords = null;
-    for (Map.Entry<String, String> tag : tags.entrySet()) {
-      final String tagKey = tag.getKey();
-      if (!newTagKeyToLayerOrders.containsKey(tagKey)) {
-        if (newTagInfoRecords == null) {
-          newTagInfoRecords = new TagInfoRecords();
-        }
-        ++tagNumber;
-        newTagInfoRecords.add(database, measurement, tagKey, tagNumber);
-        newTagKeyToLayerOrders.put(tagKey, tagNumber);
-      }
-
-      layerOrderToTagKeysInPath.put(newTagKeyToLayerOrders.get(tagKey), tagKey);
-    }
-
-    if (newTagInfoRecords != null) {
-      updateTagInfoRecords(newTagInfoRecords);
-      database2Measurement2TagOrders.get(database).put(measurement, newTagKeyToLayerOrders);
-    }
-
-    StringBuilder path =
-        new StringBuilder("root.").append(database).append(".").append(measurement);
-    for (int i = 1; i <= tagNumber; ++i) {
-      path.append(".")
-          .append(
-              layerOrderToTagKeysInPath.containsKey(i)
-                  ? tags.get(layerOrderToTagKeysInPath.get(i))
-                  : InfluxConstant.PLACE_HOLDER);
-    }
-    return path.toString();
-  }
-
-  private void updateTagInfoRecords(TagInfoRecords tagInfoRecords) {
+  @Override
+  public void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID) {
     List<InsertRowPlan> plans = tagInfoRecords.convertToInsertRowPlans();
     for (InsertRowPlan plan : plans) {
       try {
@@ -205,19 +142,6 @@ public class InfluxDBMetaManager {
     }
   }
 
-  public static Map<String, Integer> getTagOrders(String database, String measurement) {
-    Map<String, Integer> tagOrders = new HashMap<>();
-    Map<String, Map<String, Integer>> measurement2TagOrders =
-        database2Measurement2TagOrders.get(database);
-    if (measurement2TagOrders != null) {
-      tagOrders = measurement2TagOrders.get(measurement);
-    }
-    if (tagOrders == null) {
-      tagOrders = new HashMap<>();
-    }
-    return tagOrders;
-  }
-
   private static class InfluxDBMetaManagerHolder {
     private static final InfluxDBMetaManager INSTANCE = new InfluxDBMetaManager();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java
new file mode 100644
index 0000000000..5269a2bf44
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.meta;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.protocol.influxdb.handler.NewQueryHandler;
+import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils;
+import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.IoTDBJDBCDataSet;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+import org.influxdb.InfluxDBException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NewInfluxDBMetaManager extends AbstractInfluxDBMetaManager {
+
+  private final ClientRPCServiceImpl clientRPCService;
+
+  private NewInfluxDBMetaManager() {
+    clientRPCService = NewInfluxDBServiceImpl.getClientRPCService();
+  }
+
+  public static NewInfluxDBMetaManager getInstance() {
+    return InfluxDBMetaManagerHolder.INSTANCE;
+  }
+
+  @Override
+  public void recover() {
+    long sessionID = 0;
+    try {
+      TSOpenSessionResp tsOpenSessionResp =
+          clientRPCService.openSession(
+              new TSOpenSessionReq().setUsername("root").setPassword("root"));
+      sessionID = tsOpenSessionResp.getSessionId();
+      TSExecuteStatementResp resp =
+          NewQueryHandler.executeStatement(SELECT_TAG_INFO_SQL, sessionID);
+      IoTDBJDBCDataSet dataSet = QueryResultUtils.creatIoTJDBCDataset(resp);
+      try {
+        Map<String, Map<String, Integer>> measurement2TagOrders;
+        Map<String, Integer> tagOrders;
+        while (dataSet.hasCachedResults()) {
+          dataSet.constructOneRow();
+          String database = dataSet.getString("root.TAG_INFO.database_name");
+          String measurement = dataSet.getString("root.TAG_INFO.measurement_name");
+          String tag = dataSet.getString("root.TAG_INFO.tag_name");
+          Integer tagOrder = dataSet.getInt("root.TAG_INFO.tag_order");
+          if (database2Measurement2TagOrders.containsKey(database)) {
+            measurement2TagOrders = database2Measurement2TagOrders.get(database);
+            if (measurement2TagOrders.containsKey(measurement)) {
+              tagOrders = measurement2TagOrders.get(measurement);
+            } else {
+              tagOrders = new HashMap<>();
+            }
+          } else {
+            measurement2TagOrders = new HashMap<>();
+            tagOrders = new HashMap<>();
+          }
+          tagOrders.put(tag, tagOrder);
+          measurement2TagOrders.put(measurement, tagOrders);
+          database2Measurement2TagOrders.put(database, measurement2TagOrders);
+        }
+      } catch (StatementExecutionException e) {
+        throw new InfluxDBException(e.getMessage());
+      }
+    } catch (Exception e) {
+      throw new InfluxDBException(e.getMessage());
+    } finally {
+      clientRPCService.closeSession(new TSCloseSessionReq().setSessionId(sessionID));
+    }
+  }
+
+  @Override
+  public void setStorageGroup(String database, long sessionID) {
+    TSStatus status = clientRPCService.setStorageGroup(sessionID, "root." + database);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || status.getCode() == TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode()) {
+      return;
+    }
+    throw new InfluxDBException(status.getMessage());
+  }
+
+  @Override
+  public void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID) {
+    try {
+      List<TSInsertRecordReq> reqs = tagInfoRecords.convertToInsertRecordsReq(sessionID);
+      for (TSInsertRecordReq tsInsertRecordReq : reqs) {
+        TSStatus tsStatus = clientRPCService.insertRecord(tsInsertRecordReq);
+        if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new InfluxDBException(tsStatus.getMessage());
+        }
+      }
+    } catch (IoTDBConnectionException e) {
+      throw new InfluxDBException(e.getMessage());
+    }
+  }
+
+  private static class InfluxDBMetaManagerHolder {
+    private static final NewInfluxDBMetaManager INSTANCE = new NewInfluxDBMetaManager();
+
+    private InfluxDBMetaManagerHolder() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java
index e6cf7b4f41..c38df89d0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfoRecords.java
@@ -24,18 +24,21 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.utils.DataTypeUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.influxdb.InfluxDBException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class TagInfoRecords {
 
   private static final String TAG_INFO_DEVICE_ID = "root.TAG_INFO";
   private static final List<String> TAG_INFO_MEASUREMENTS = new ArrayList<>();
   private static final List<TSDataType> TAG_INFO_TYPES = new ArrayList<>();
+  private static final AtomicLong TAG_TIME_STAMPS = new AtomicLong();
 
   static {
     TAG_INFO_MEASUREMENTS.add("database_name");
@@ -65,11 +68,8 @@ public class TagInfoRecords {
 
   public void add(String database, String measurement, String tag, int order) {
     deviceIds.add(TAG_INFO_DEVICE_ID);
-    if (times.size() == 0) {
-      times.add(System.currentTimeMillis());
-    } else {
-      times.add(times.get(times.size() - 1) + 1);
-    }
+    // Multiple adjacent records, possibly with the same timestamp
+    times.add(TAG_TIME_STAMPS.getAndIncrement());
     measurementsList.add(TAG_INFO_MEASUREMENTS);
     typesList.add(TAG_INFO_TYPES);
 
@@ -98,4 +98,22 @@ public class TagInfoRecords {
     }
     return insertRowPlans;
   }
+
+  public List<TSInsertRecordReq> convertToInsertRecordsReq(long sessionID)
+      throws IoTDBConnectionException {
+    ArrayList<TSInsertRecordReq> reqs = new ArrayList<>();
+    long now = 0;
+    for (int i = 0; i < deviceIds.size(); i++) {
+      TSInsertRecordReq tsInsertRecordReq = new TSInsertRecordReq();
+      tsInsertRecordReq.setSessionId(sessionID);
+      tsInsertRecordReq.setTimestamp(times.get(i));
+      tsInsertRecordReq.setIsAligned(false);
+      tsInsertRecordReq.setPrefixPath(deviceIds.get(i));
+      tsInsertRecordReq.setMeasurements(measurementsList.get(i));
+      tsInsertRecordReq.setValues(
+          DataTypeUtils.getValueBuffer(typesList.get(i), valuesList.get(i)));
+      reqs.add(tsInsertRecordReq);
+    }
+    return reqs;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/InfluxReqAndRespUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/InfluxReqAndRespUtils.java
new file mode 100644
index 0000000000..c3c27af3d8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/InfluxReqAndRespUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.influxdb.util;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.utils.DataTypeUtils;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+public class InfluxReqAndRespUtils {
+
+  public static TSOpenSessionReq convertOpenSessionReq(InfluxOpenSessionReq influxOpenSessionReq) {
+    TSOpenSessionReq tsOpenSessionReq = new TSOpenSessionReq();
+    tsOpenSessionReq.setZoneId(influxOpenSessionReq.getZoneId());
+    tsOpenSessionReq.setUsername(influxOpenSessionReq.getUsername());
+    tsOpenSessionReq.setPassword(influxOpenSessionReq.getPassword());
+    tsOpenSessionReq.setConfiguration(influxOpenSessionReq.getConfiguration());
+    return tsOpenSessionReq;
+  }
+
+  public static InfluxOpenSessionResp convertOpenSessionResp(TSOpenSessionResp tsOpenSessionResp) {
+    InfluxOpenSessionResp influxOpenSessionResp = new InfluxOpenSessionResp();
+    influxOpenSessionResp.setSessionId(tsOpenSessionResp.getSessionId());
+    TSStatus tsStatus = tsOpenSessionResp.getStatus();
+    influxOpenSessionResp.setStatus(DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus));
+    return influxOpenSessionResp;
+  }
+
+  public static TSCloseSessionReq convertCloseSessionReq(
+      InfluxCloseSessionReq influxCloseSessionReq) {
+    TSCloseSessionReq tsCloseSessionReq = new TSCloseSessionReq();
+    tsCloseSessionReq.setSessionId(influxCloseSessionReq.getSessionId());
+    return tsCloseSessionReq;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
index 686dd20b91..325971df25 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
@@ -19,17 +19,23 @@
 package org.apache.iotdb.db.protocol.influxdb.util;
 
 import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant;
+import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
+import org.apache.iotdb.rpc.IoTDBJDBCDataSet;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
+import org.influxdb.InfluxDBException;
 import org.influxdb.dto.QueryResult;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -287,4 +293,151 @@ public class QueryResultUtils {
   public static boolean checkQueryResultNull(QueryResult queryResult) {
     return queryResult.getResults().get(0).getSeries() == null;
   }
+
+  public static List<String> getFullPaths(TSExecuteStatementResp tsExecuteStatementResp) {
+    List<String> res = new ArrayList<>();
+    IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+    try {
+      while (ioTDBJDBCDataSet.hasCachedResults()) {
+        ioTDBJDBCDataSet.constructOneRow();
+        String path = ioTDBJDBCDataSet.getValueByName("timeseries");
+        res.add(path);
+      }
+    } catch (StatementExecutionException e) {
+      throw new InfluxDBException(e.getMessage());
+    }
+    return res;
+  }
+
+  public static QueryResult iotdbResultConvertInfluxResult(
+      TSExecuteStatementResp tsExecuteStatementResp,
+      String database,
+      String measurement,
+      Map<String, Integer> fieldOrders) {
+    if (tsExecuteStatementResp == null) {
+      return getNullQueryResult();
+    }
+    // generate series
+    QueryResult.Series series = new QueryResult.Series();
+    series.setName(measurement);
+    // gets the reverse map of the tag
+    Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
+    Map<Integer, String> tagOrderReversed =
+        tagOrders.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
+    Map<Integer, String> fieldOrdersReversed =
+        fieldOrders.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
+    int tagSize = tagOrderReversed.size();
+    ArrayList<String> tagList = new ArrayList<>();
+    for (int i = 1; i <= tagSize; i++) {
+      tagList.add(tagOrderReversed.get(i));
+    }
+
+    ArrayList<String> fieldList = new ArrayList<>();
+    for (int i = 1 + tagSize; i < 1 + tagSize + fieldOrders.size(); i++) {
+      fieldList.add(fieldOrdersReversed.get(i));
+    }
+    ArrayList<String> columns = new ArrayList<>();
+    columns.add("time");
+    columns.addAll(tagList);
+    columns.addAll(fieldList);
+    // insert columns into series
+    series.setColumns(columns);
+    List<List<Object>> values = new ArrayList<>();
+    IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+    try {
+      while (ioTDBJDBCDataSet.hasCachedResults()) {
+        Object[] value = new Object[columns.size()];
+        ioTDBJDBCDataSet.constructOneRow();
+        value[0] = Long.valueOf(ioTDBJDBCDataSet.getValueByName("Time"));
+        String deviceName = ioTDBJDBCDataSet.getValueByName("Device");
+        String[] deviceNameList = deviceName.split("\\.");
+        for (int i = 3; i < deviceNameList.length; i++) {
+          if (!deviceNameList[i].equals(InfluxConstant.PLACE_HOLDER)) {
+            value[i - 2] = deviceNameList[i];
+          }
+        }
+        for (int i = 3; i <= ioTDBJDBCDataSet.columnNameList.size(); i++) {
+          Object o = ioTDBJDBCDataSet.getObject(ioTDBJDBCDataSet.findColumnNameByIndex(i));
+          if (o != null) {
+            // insert the value of filed into it
+            value[fieldOrders.get(ioTDBJDBCDataSet.findColumnNameByIndex(i))] = o;
+          }
+        }
+        values.add(Arrays.asList(value));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    series.setValues(values);
+
+    QueryResult queryResult = new QueryResult();
+    QueryResult.Result result = new QueryResult.Result();
+    result.setSeries(new ArrayList<>(Arrays.asList(series)));
+    queryResult.setResults(new ArrayList<>(Arrays.asList(result)));
+
+    return queryResult;
+  }
+
+  public static List<InfluxFunctionValue> getInfluxFunctionValues(
+      TSExecuteStatementResp tsExecuteStatementResp) {
+    IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+    List<InfluxFunctionValue> result = new ArrayList<>(ioTDBJDBCDataSet.columnSize);
+    try {
+      while (ioTDBJDBCDataSet.hasCachedResults()) {
+        ioTDBJDBCDataSet.constructOneRow();
+        Long timestamp = null;
+        for (String columnName : ioTDBJDBCDataSet.columnNameList) {
+          if ("Time".equals(columnName)) {
+            timestamp = ioTDBJDBCDataSet.getTimestamp(columnName).getTime();
+            continue;
+          }
+          Object o = ioTDBJDBCDataSet.getObject(columnName);
+          result.add(new InfluxFunctionValue(o, timestamp));
+        }
+      }
+    } catch (StatementExecutionException e) {
+      throw new InfluxDBException(e.getMessage());
+    }
+    return result;
+  }
+
+  public static Map<String, Object> getColumnNameAndValue(
+      TSExecuteStatementResp tsExecuteStatementResp) {
+    IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp);
+    Map<String, Object> result = new HashMap<>();
+    try {
+      while (ioTDBJDBCDataSet.hasCachedResults()) {
+        ioTDBJDBCDataSet.constructOneRow();
+        for (String columnName : ioTDBJDBCDataSet.columnNameList) {
+          Object o = ioTDBJDBCDataSet.getObject(columnName);
+          result.put(columnName, o);
+        }
+      }
+    } catch (StatementExecutionException e) {
+      throw new InfluxDBException(e.getMessage());
+    }
+    return result;
+  }
+
+  public static IoTDBJDBCDataSet creatIoTJDBCDataset(
+      TSExecuteStatementResp tsExecuteStatementResp) {
+    return new IoTDBJDBCDataSet(
+        null,
+        tsExecuteStatementResp.getColumns(),
+        tsExecuteStatementResp.getDataTypeList(),
+        tsExecuteStatementResp.columnNameIndexMap,
+        tsExecuteStatementResp.ignoreTimeStamp,
+        tsExecuteStatementResp.queryId,
+        0,
+        null,
+        0,
+        tsExecuteStatementResp.queryDataSet,
+        0,
+        0,
+        tsExecuteStatementResp.sgColumns,
+        null);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java
index 40d588c979..9cda89a43e 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java
@@ -72,6 +72,17 @@ public class StringUtils {
     return tmpList[tmpList.length - 1];
   }
 
+  /**
+   * get the devicePath through the fullPath
+   *
+   * @param path path to process
+   * @return devicePath
+   */
+  public static String getDeviceByPath(String path) {
+    String field = getFieldByPath(path);
+    return path.substring(0, path.length() - field.length() - 1);
+  }
+
   /**
    * determine whether the two string lists are the same
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java b/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java
index 4f3ffe3ccb..3930e27fe2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InfluxDBRPCService.java
@@ -27,11 +27,13 @@ import org.apache.iotdb.commons.service.ThriftServiceThread;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.thrift.handler.InfluxDBServiceThriftHandler;
-import org.apache.iotdb.db.service.thrift.impl.InfluxDBServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.IInfluxDBServiceWithHandler;
+import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxDBService.Processor;
 
 public class InfluxDBRPCService extends ThriftService implements InfluxDBRPCServiceMBean {
-  private InfluxDBServiceImpl impl;
+  private IInfluxDBServiceWithHandler impl;
 
   public static InfluxDBRPCService getInstance() {
     return InfluxDBServiceHolder.INSTANCE;
@@ -40,17 +42,25 @@ public class InfluxDBRPCService extends ThriftService implements InfluxDBRPCServ
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    impl =
-        (InfluxDBServiceImpl)
-            Class.forName(IoTDBDescriptor.getInstance().getConfig().getInfluxDBImplClassName())
-                .newInstance();
+    if (IoTDBDescriptor.getInstance()
+        .getConfig()
+        .getRpcImplClassName()
+        .equals(ClientRPCServiceImpl.class.getName())) {
+      impl =
+          (IInfluxDBServiceWithHandler)
+              Class.forName(NewInfluxDBServiceImpl.class.getName()).newInstance();
+    } else {
+      impl =
+          (IInfluxDBServiceWithHandler)
+              Class.forName(IoTDBDescriptor.getInstance().getConfig().getInfluxDBImplClassName())
+                  .newInstance();
+    }
     initSyncedServiceImpl(null);
     processor = new Processor<>(impl);
   }
 
   @Override
-  public void initThriftServiceThread()
-      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+  public void initThriftServiceThread() throws IllegalAccessException {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     try {
       thriftServiceThread =
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
index 75ad2c8a30..031dcb1e49 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.service.thrift.handler;
 
-import org.apache.iotdb.db.service.thrift.impl.InfluxDBServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.IInfluxDBServiceWithHandler;
 
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
@@ -27,10 +27,10 @@ import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TTransport;
 
 public class InfluxDBServiceThriftHandler implements TServerEventHandler {
-  private final InfluxDBServiceImpl influxDBServiceImpl;
+  private final IInfluxDBServiceWithHandler impl;
 
-  public InfluxDBServiceThriftHandler(InfluxDBServiceImpl influxDBServiceImpl) {
-    this.influxDBServiceImpl = influxDBServiceImpl;
+  public InfluxDBServiceThriftHandler(IInfluxDBServiceWithHandler impl) {
+    this.impl = impl;
   }
 
   @Override
@@ -48,7 +48,7 @@ public class InfluxDBServiceThriftHandler implements TServerEventHandler {
   public void deleteContext(
       ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
     // release resources.
-    influxDBServiceImpl.handleClientExit();
+    impl.handleClientExit();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IInfluxDBServiceWithHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IInfluxDBServiceWithHandler.java
new file mode 100644
index 0000000000..d71e7570a0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IInfluxDBServiceWithHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxDBService;
+
+public interface IInfluxDBServiceWithHandler extends InfluxDBService.Iface {
+  void handleClientExit();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index dee86edde3..89e5429dd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -25,8 +25,10 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.protocol.influxdb.dto.IoTDBPoint;
+import org.apache.iotdb.db.protocol.influxdb.handler.AbstractQueryHandler;
 import org.apache.iotdb.db.protocol.influxdb.handler.QueryHandler;
 import org.apache.iotdb.db.protocol.influxdb.input.InfluxLineParser;
+import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
 import org.apache.iotdb.db.protocol.influxdb.sql.InfluxDBLogicalGenerator;
@@ -41,7 +43,6 @@ import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.db.utils.DataTypeUtils;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCreateDatabaseReq;
-import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxDBService;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionReq;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionResp;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryReq;
@@ -60,14 +61,17 @@ import org.influxdb.dto.Point;
 import java.util.ArrayList;
 import java.util.List;
 
-public class InfluxDBServiceImpl implements InfluxDBService.Iface {
+public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
 
   private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  private final InfluxDBMetaManager metaManager;
+  private final AbstractInfluxDBMetaManager metaManager;
+
+  private final AbstractQueryHandler queryHandler;
 
   public InfluxDBServiceImpl() {
     metaManager = InfluxDBMetaManager.getInstance();
+    queryHandler = new QueryHandler();
   }
 
   @Override
@@ -100,7 +104,8 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
     int executeCode = TSStatusCode.SUCCESS_STATUS.getStatusCode();
     for (Point point :
         InfluxLineParser.parserRecordsToPointsWithPrecision(req.lineProtocol, req.precision)) {
-      IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager);
+      IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager, req.sessionId);
+
       try {
         InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan();
         InfluxTSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId);
@@ -121,7 +126,7 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
   }
 
   @Override
-  public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) throws TException {
+  public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) {
     if (!SESSION_MANAGER.checkLogin(req.sessionId)) {
       return getNotLoggedInStatus();
     }
@@ -145,11 +150,12 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
   @Override
   public InfluxQueryResultRsp query(InfluxQueryReq req) throws TException {
     Operator operator = InfluxDBLogicalGenerator.generate(req.command);
-    QueryHandler.checkInfluxDBQueryOperator(operator);
-    return QueryHandler.queryInfluxDB(
+    queryHandler.checkInfluxDBQueryOperator(operator);
+    return queryHandler.queryInfluxDB(
         req.database, (InfluxQueryOperator) operator, req.sessionId, IoTDB.serviceProvider);
   }
 
+  @Override
   public void handleClientExit() {
     Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId();
     if (sessionId != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java
new file mode 100644
index 0000000000..422bc27fd4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.protocol.influxdb.dto.IoTDBPoint;
+import org.apache.iotdb.db.protocol.influxdb.handler.AbstractQueryHandler;
+import org.apache.iotdb.db.protocol.influxdb.handler.NewQueryHandler;
+import org.apache.iotdb.db.protocol.influxdb.input.InfluxLineParser;
+import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.meta.NewInfluxDBMetaManager;
+import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator;
+import org.apache.iotdb.db.protocol.influxdb.sql.InfluxDBLogicalGenerator;
+import org.apache.iotdb.db.protocol.influxdb.util.InfluxReqAndRespUtils;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.DataTypeUtils;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCreateDatabaseReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxOpenSessionResp;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryReq;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryResultRsp;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxTSStatus;
+import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxWritePointsReq;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+import org.apache.thrift.TException;
+import org.influxdb.InfluxDBException;
+import org.influxdb.dto.Point;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NewInfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
+
+  private static final ClientRPCServiceImpl clientRPCService = new ClientRPCServiceImpl();
+
+  private final AbstractInfluxDBMetaManager metaManager;
+
+  private final AbstractQueryHandler queryHandler;
+
+  public NewInfluxDBServiceImpl() {
+    metaManager = NewInfluxDBMetaManager.getInstance();
+    queryHandler = new NewQueryHandler();
+  }
+
+  public static ClientRPCServiceImpl getClientRPCService() {
+    return clientRPCService;
+  }
+
+  @Override
+  public InfluxOpenSessionResp openSession(InfluxOpenSessionReq req) throws TException {
+    TSOpenSessionReq tsOpenSessionReq = InfluxReqAndRespUtils.convertOpenSessionReq(req);
+    TSOpenSessionResp tsOpenSessionResp = clientRPCService.openSession(tsOpenSessionReq);
+    return InfluxReqAndRespUtils.convertOpenSessionResp(tsOpenSessionResp);
+  }
+
+  @Override
+  public InfluxTSStatus closeSession(InfluxCloseSessionReq req) {
+    TSCloseSessionReq tsCloseSessionReq = InfluxReqAndRespUtils.convertCloseSessionReq(req);
+    TSStatus tsStatus = clientRPCService.closeSession(tsCloseSessionReq);
+    return DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus);
+  }
+
+  @Override
+  public InfluxTSStatus writePoints(InfluxWritePointsReq req) {
+    List<InfluxTSStatus> tsStatusList = new ArrayList<>();
+    int executeCode = TSStatusCode.SUCCESS_STATUS.getStatusCode();
+    for (Point point :
+        InfluxLineParser.parserRecordsToPointsWithPrecision(req.lineProtocol, req.precision)) {
+      IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager, req.sessionId);
+      try {
+        TSInsertRecordReq insertRecordReq = iotdbPoint.convertToTSInsertRecordReq(req.sessionId);
+        TSStatus tsStatus = clientRPCService.insertRecord(insertRecordReq);
+        tsStatusList.add(DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus));
+      } catch (IoTDBConnectionException e) {
+        throw new InfluxDBException(e.getMessage());
+      }
+    }
+    return new InfluxTSStatus().setCode(executeCode).setSubStatus(tsStatusList);
+  }
+
+  @Override
+  public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) {
+    TSStatus tsStatus =
+        clientRPCService.setStorageGroup(req.sessionId, "root." + req.getDatabase());
+    if (tsStatus.getCode() == TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode()) {
+      tsStatus.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      tsStatus.setMessage("Execute successfully");
+    }
+    return DataTypeUtils.RPCStatusToInfluxDBTSStatus(tsStatus);
+  }
+
+  @Override
+  public InfluxQueryResultRsp query(InfluxQueryReq req) throws TException {
+    Operator operator = InfluxDBLogicalGenerator.generate(req.command);
+    queryHandler.checkInfluxDBQueryOperator(operator);
+    return queryHandler.queryInfluxDB(
+        req.database, (InfluxQueryOperator) operator, req.sessionId, IoTDB.serviceProvider);
+  }
+
+  @Override
+  public void handleClientExit() {
+    clientRPCService.handleClientExit();
+  }
+}