You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/03/25 07:10:34 UTC

[iotdb] branch master updated: IOTDB-1241 support redirect query for cluster (#2867)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b22439  IOTDB-1241 support redirect query for cluster (#2867)
9b22439 is described below

commit 9b22439d047ed0915004139e4be7cdd12854d507
Author: chaow <cc...@163.com>
AuthorDate: Thu Mar 25 15:10:11 2021 +0800

    IOTDB-1241 support redirect query for cluster (#2867)
    
    * IOTDB-1241 support redirect query for cluster
    * implement redirect with only partitionTable
---
 .../resources/conf/iotdb-cluster.properties        |   2 +
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 ++
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 +
 .../apache/iotdb/cluster/metadata/CMManager.java   |   5 +-
 .../cluster/query/ClusterDataQueryExecutor.java    | 133 ++++++++++++++++++-
 .../query/aggregate/ClusterAggregateExecutor.java  |   2 +-
 .../groupby/ClusterGroupByVFilterDataSet.java      |   2 +-
 .../cluster/query/reader/ClusterReaderFactory.java |  26 ++--
 .../cluster/query/reader/ClusterTimeGenerator.java | 145 +++++++++++++++++++--
 .../query/ClusterDataQueryExecutorTest.java        |  83 ++++++++++++
 .../query/reader/ClusterTimeGeneratorTest.java     |   2 +-
 .../main/java/org/apache/iotdb/SessionExample.java | 102 ++++++++++++++-
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  10 ++
 .../db/query/dataset/AlignByDeviceDataSet.java     |  11 ++
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  11 ++
 .../db/query/executor/RawDataQueryExecutor.java    |  25 ++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  78 ++++++++++-
 .../org/apache/iotdb/rpc/RedirectException.java    |   3 +-
 .../java/org/apache/iotdb/session/Session.java     | 103 +++++++++++++--
 .../apache/iotdb/session/SessionConnection.java    |  32 ++++-
 thrift/src/main/thrift/rpc.thrift                  |   3 +
 .../tsfile/read/query/dataset/QueryDataSet.java    |  48 +++++++
 22 files changed, 792 insertions(+), 49 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 3c9fabe..4126539 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -171,3 +171,5 @@ max_client_pernode_permember_number=1000
 # we need to wait so much time for other connections to be released until timeout,
 # or a new connection will be created.
 wait_client_timeout_ms=5000
+
+enable_query_redirect=false
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 11cedc8..3d1cd32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -164,6 +164,8 @@ public class ClusterConfig {
 
   private boolean openServerRpcPort = false;
 
+  private boolean enableQueryRedirect = false;
+
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
@@ -467,4 +469,12 @@ public class ClusterConfig {
   public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
     this.waitClientTimeoutMS = waitClientTimeoutMS;
   }
+
+  public boolean isEnableQueryRedirect() {
+    return enableQueryRedirect;
+  }
+
+  public void setEnableQueryRedirect(boolean enableQueryRedirect) {
+    this.enableQueryRedirect = enableQueryRedirect;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 25f85e0..a2488f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -299,6 +299,11 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "wait_client_timeout_ms", String.valueOf(config.getWaitClientTimeoutMS()))));
 
+    config.setEnableQueryRedirect(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_query_redirect", String.valueOf(config.isEnableQueryRedirect()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 88451b7..da2d6ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1236,11 +1236,12 @@ public class CMManager extends MManager {
       try {
         Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
         logger.debug(
-            "{}: get matched paths of {} from {}, result {}",
+            "{}: get matched paths of {} from {}, result {} for {}",
             metaGroupMember.getName(),
             partitionGroup,
             node,
-            paths);
+            paths,
+            pathsToQuery);
         if (paths != null) {
           // query next group
           Set<PartialPath> partialPaths = new HashSet<>();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index fbf8963..da371d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -21,14 +21,17 @@ package org.apache.iotdb.cluster.query;
 
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.EmptyIntervalException;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
 import org.apache.iotdb.cluster.query.reader.ClusterTimeGenerator;
+import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
 import org.apache.iotdb.db.query.executor.RawDataQueryExecutor;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -36,7 +39,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +56,8 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
   private static final Logger logger = LoggerFactory.getLogger(ClusterDataQueryExecutor.class);
   private MetaGroupMember metaGroupMember;
   private ClusterReaderFactory readerFactory;
+  private QueryDataSet.EndPoint endPoint = null;
+  private boolean hasLocalReader = false;
 
   ClusterDataQueryExecutor(RawDataQueryPlan plan, MetaGroupMember metaGroupMember) {
     super(plan);
@@ -74,6 +81,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
     }
 
     List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+    hasLocalReader = false;
     for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
       PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
       TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
@@ -99,6 +107,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
     if (logger.isDebugEnabled()) {
       logger.debug("Initialized {} readers for {}", readersOfSelectedSeries.size(), queryPlan);
     }
+
     return readersOfSelectedSeries;
   }
 
@@ -114,6 +123,128 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
   protected TimeGenerator getTimeGenerator(
       IExpression queryExpression, QueryContext context, RawDataQueryPlan rawDataQueryPlan)
       throws StorageEngineException {
-    return new ClusterTimeGenerator(queryExpression, context, metaGroupMember, rawDataQueryPlan);
+    return new ClusterTimeGenerator(
+        queryExpression, context, metaGroupMember, rawDataQueryPlan, false);
+  }
+
+  @Override
+  protected QueryDataSet needRedirect(QueryContext context, boolean hasValueFilter)
+      throws StorageEngineException {
+    if (queryPlan.isEnableRedirect()) {
+      if (hasValueFilter) {
+        // 1. check time Generator has local data
+        ClusterTimeGenerator clusterTimeGenerator =
+            new ClusterTimeGenerator(
+                queryPlan.getExpression(), context, metaGroupMember, queryPlan, true);
+        if (clusterTimeGenerator.isHasLocalReader()) {
+          this.hasLocalReader = true;
+          this.endPoint = null;
+        }
+
+        // 2. check data reader has local data
+        checkReaderHasLocalData(context, true);
+      } else {
+        // check data reader has local data
+        checkReaderHasLocalData(context, false);
+      }
+
+      logger.debug(
+          "redirect queryId {}, {}, {}, {}",
+          context.getQueryId(),
+          hasLocalReader,
+          hasValueFilter,
+          endPoint);
+
+      if (!hasLocalReader) {
+        // dummy dataSet
+        QueryDataSet dataSet = new RawQueryDataSetWithoutValueFilter(context.getQueryId());
+        dataSet.setEndPoint(endPoint);
+        return dataSet;
+      }
+    }
+    return null;
+  }
+
+  @SuppressWarnings({"squid:S3776", "squid:S1141"})
+  private void checkReaderHasLocalData(QueryContext context, boolean hasValueFilter)
+      throws StorageEngineException {
+    Filter timeFilter = null;
+    if (!hasValueFilter && queryPlan.getExpression() != null) {
+      timeFilter = ((GlobalTimeExpression) queryPlan.getExpression()).getFilter();
+    }
+
+    // make sure the partition table is new
+    try {
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
+    } catch (CheckConsistencyException e) {
+      throw new StorageEngineException(e);
+    }
+
+    for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+      PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+      TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+
+      try {
+        List<PartitionGroup> partitionGroups = null;
+        if (hasValueFilter) {
+          partitionGroups = metaGroupMember.routeFilter(null, path);
+        } else {
+          partitionGroups = metaGroupMember.routeFilter(timeFilter, path);
+        }
+
+        for (PartitionGroup partitionGroup : partitionGroups) {
+          if (partitionGroup.contains(metaGroupMember.getThisNode())) {
+            DataGroupMember dataGroupMember =
+                metaGroupMember.getLocalDataMember(
+                    partitionGroup.getHeader(),
+                    String.format(
+                        "Query: %s, time filter: %s, queryId: %d",
+                        path, null, context.getQueryId()));
+
+            if (hasValueFilter) {
+              IReaderByTimestamp readerByTimestamp =
+                  readerFactory.getReaderByTimestamp(
+                      path,
+                      queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+                      dataType,
+                      context,
+                      dataGroupMember,
+                      queryPlan.isAscending());
+
+              if (readerByTimestamp != null) {
+                this.hasLocalReader = true;
+                this.endPoint = null;
+              }
+            } else {
+              IPointReader pointReader =
+                  readerFactory.getSeriesPointReader(
+                      path,
+                      queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+                      dataType,
+                      timeFilter,
+                      null,
+                      context,
+                      dataGroupMember,
+                      queryPlan.isAscending());
+
+              if (pointReader.hasNextTimeValuePair()) {
+                this.hasLocalReader = true;
+                this.endPoint = null;
+                pointReader.close();
+                break;
+              }
+              pointReader.close();
+            }
+          } else if (endPoint == null) {
+            endPoint =
+                new QueryDataSet.EndPoint(
+                    partitionGroup.getHeader().getClientIp(),
+                    partitionGroup.getHeader().getClientPort());
+          }
+        }
+      } catch (Exception e) {
+        throw new StorageEngineException(e);
+      }
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
index 97f2b58..ba6eba3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
@@ -85,7 +85,7 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
   @Override
   protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan rawDataQueryPlan)
       throws StorageEngineException {
-    return new ClusterTimeGenerator(expression, context, metaMember, rawDataQueryPlan);
+    return new ClusterTimeGenerator(expression, context, metaMember, rawDataQueryPlan, false);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
index 71faf40..a954234 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
@@ -63,7 +63,7 @@ public class ClusterGroupByVFilterDataSet extends GroupByWithValueFilterDataSet
   protected TimeGenerator getTimeGenerator(
       IExpression expression, QueryContext context, RawDataQueryPlan rawDataQueryPlan)
       throws StorageEngineException {
-    return new ClusterTimeGenerator(expression, context, metaGroupMember, rawDataQueryPlan);
+    return new ClusterTimeGenerator(expression, context, metaGroupMember, rawDataQueryPlan, false);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 91819b1..77ffc01 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -121,11 +121,11 @@ public class ClusterReaderFactory {
     List<IReaderByTimestamp> readers = new ArrayList<>(partitionGroups.size());
     for (PartitionGroup partitionGroup : partitionGroups) {
       // query each group to get a reader in that group
-      readers.add(
+      IReaderByTimestamp readerByTimestamp =
           getSeriesReaderByTime(
-              partitionGroup, path, deviceMeasurements, context, dataType, ascending));
+              partitionGroup, path, deviceMeasurements, context, dataType, ascending);
+      readers.add(readerByTimestamp);
     }
-    // merge the readers
     return new MergedReaderByTime(readers);
   }
 
@@ -311,13 +311,13 @@ public class ClusterReaderFactory {
    * Create an IPointReader of "path" with “timeFilter” and "valueFilter". A synchronization with
    * the leader will be performed according to consistency level
    *
-   * @param path
-   * @param dataType
+   * @param path series path
+   * @param dataType data type
    * @param timeFilter nullable
    * @param valueFilter nullable
-   * @param context
-   * @return
-   * @throws StorageEngineException
+   * @param context query context
+   * @return reader
+   * @throws StorageEngineException encounter exception
    */
   public IPointReader getSeriesPointReader(
       PartialPath path,
@@ -351,13 +351,13 @@ public class ClusterReaderFactory {
    * Create a SeriesReader of "path" with “timeFilter” and "valueFilter". The consistency is not
    * guaranteed here and only data slots managed by the member will be queried.
    *
-   * @param path
-   * @param dataType
+   * @param path series path
+   * @param dataType data type
    * @param timeFilter nullable
    * @param valueFilter nullable
-   * @param context
-   * @return
-   * @throws StorageEngineException
+   * @param context query context
+   * @return reader for series
+   * @throws StorageEngineException encounter exception
    */
   private SeriesReader getSeriesReader(
       PartialPath path,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
index b7b5f51..371c81b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
@@ -21,39 +21,59 @@ package org.apache.iotdb.cluster.query.reader;
 
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.LeafNode;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 
 public class ClusterTimeGenerator extends ServerTimeGenerator {
 
   private ClusterReaderFactory readerFactory;
+  private boolean hasLocalReader = false;
+  private QueryDataSet.EndPoint endPoint = null;
 
   /** Constructor of EngineTimeGenerator. */
   public ClusterTimeGenerator(
       IExpression expression,
       QueryContext context,
       MetaGroupMember metaGroupMember,
-      RawDataQueryPlan rawDataQueryPlan)
+      RawDataQueryPlan rawDataQueryPlan,
+      boolean onlyCheckLocalData)
       throws StorageEngineException {
     super(context);
     this.queryPlan = rawDataQueryPlan;
     this.readerFactory = new ClusterReaderFactory(metaGroupMember);
     try {
       readerFactory.syncMetaGroup();
-      constructNode(expression);
+      if (onlyCheckLocalData) {
+        whetherHasLocalDataGroup(expression, metaGroupMember, queryPlan.isAscending());
+      } else {
+        constructNode(expression);
+      }
     } catch (IOException | CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
@@ -65,20 +85,125 @@ public class ClusterTimeGenerator extends ServerTimeGenerator {
     Filter filter = expression.getFilter();
     PartialPath path = (PartialPath) expression.getSeriesPath();
     TSDataType dataType;
+    ManagedSeriesReader mergeReader = null;
     try {
       dataType =
           ((CMManager) IoTDB.metaManager)
               .getSeriesTypesByPaths(Collections.singletonList(path), null)
               .left
               .get(0);
-      return readerFactory.getSeriesReader(
-          path,
-          queryPlan.getAllMeasurementsInDevice(path.getDevice()),
-          dataType,
-          null,
-          filter,
-          context,
-          queryPlan.isAscending());
+      mergeReader =
+          readerFactory.getSeriesReader(
+              path,
+              queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+              dataType,
+              null,
+              filter,
+              context,
+              queryPlan.isAscending());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return mergeReader;
+  }
+
+  public boolean isHasLocalReader() {
+    return hasLocalReader;
+  }
+
+  public void setHasLocalReader(boolean hasLocalReader) {
+    this.hasLocalReader = hasLocalReader;
+  }
+
+  public QueryDataSet.EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", has local reader:" + hasLocalReader;
+  }
+
+  public void whetherHasLocalDataGroup(
+      IExpression expression, MetaGroupMember metaGroupMember, boolean isAscending)
+      throws IOException {
+    this.hasLocalReader = false;
+    constructNode(expression, metaGroupMember, isAscending);
+  }
+
+  private Node constructNode(
+      IExpression expression, MetaGroupMember metaGroupMember, boolean isAscending)
+      throws IOException {
+    if (expression.getType() == ExpressionType.SERIES) {
+      SingleSeriesExpression singleSeriesExp = (SingleSeriesExpression) expression;
+      checkHasLocalReader(singleSeriesExp, metaGroupMember);
+      return new LeafNode(null);
+    } else {
+      Node leftChild =
+          constructNode(((IBinaryExpression) expression).getLeft(), metaGroupMember, isAscending);
+      Node rightChild =
+          constructNode(((IBinaryExpression) expression).getRight(), metaGroupMember, isAscending);
+
+      if (expression.getType() == ExpressionType.OR) {
+        return new OrNode(leftChild, rightChild, isAscending);
+      } else if (expression.getType() == ExpressionType.AND) {
+        return new AndNode(leftChild, rightChild, isAscending);
+      }
+      throw new UnSupportedDataTypeException(
+          "Unsupported ExpressionType when construct OperatorNode: " + expression.getType());
+    }
+  }
+
+  private void checkHasLocalReader(
+      SingleSeriesExpression expression, MetaGroupMember metaGroupMember) throws IOException {
+    Filter filter = expression.getFilter();
+    PartialPath path = (PartialPath) expression.getSeriesPath();
+    TSDataType dataType;
+    try {
+      dataType =
+          ((CMManager) IoTDB.metaManager)
+              .getSeriesTypesByPaths(Collections.singletonList(path), null)
+              .left
+              .get(0);
+
+      List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(null, path);
+      for (PartitionGroup partitionGroup : partitionGroups) {
+        if (partitionGroup.contains(metaGroupMember.getThisNode())) {
+          DataGroupMember dataGroupMember =
+              metaGroupMember.getLocalDataMember(
+                  partitionGroup.getHeader(),
+                  String.format(
+                      "Query: %s, time filter: %s, queryId: %d", path, null, context.getQueryId()));
+
+          IPointReader pointReader =
+              readerFactory.getSeriesPointReader(
+                  path,
+                  queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+                  dataType,
+                  null,
+                  filter,
+                  context,
+                  dataGroupMember,
+                  queryPlan.isAscending());
+
+          if (pointReader.hasNextTimeValuePair()) {
+            this.hasLocalReader = true;
+            this.endPoint = null;
+            pointReader.close();
+            break;
+          }
+          pointReader.close();
+        } else if (endPoint == null) {
+          endPoint =
+              new QueryDataSet.EndPoint(
+                  partitionGroup.getHeader().getClientIp(),
+                  partitionGroup.getHeader().getClientPort());
+        }
+      }
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index aa48624..bf88b89 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.query;
 
 import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -27,18 +28,40 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 public class ClusterDataQueryExecutorTest extends BaseQueryTest {
 
   private ClusterDataQueryExecutor queryExecutor;
 
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(true);
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(false);
+  }
+
   @Test
   public void testNoFilter() throws IOException, StorageEngineException, QueryProcessException {
     RawDataQueryPlan plan = new RawDataQueryPlan();
@@ -75,4 +98,64 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
       QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
+
+  @Test
+  public void testNoFilterWithRedirect() throws StorageEngineException, QueryProcessException {
+    RawDataQueryPlan plan = new RawDataQueryPlan();
+    plan.setDeduplicatedPaths(pathList);
+    plan.setDeduplicatedDataTypes(dataTypes);
+    plan.setEnableRedirect(true);
+    queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+    RemoteQueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+      assertNull(dataSet.getEndPoint());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
+
+  @Test
+  public void testFilterWithValueFilterRedirect()
+      throws StorageEngineException, QueryProcessException, IllegalPathException {
+    IExpression expression =
+        new SingleSeriesExpression(
+            new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0));
+    RawDataQueryPlan plan = new RawDataQueryPlan();
+    plan.setDeduplicatedPaths(pathList);
+    plan.setDeduplicatedDataTypes(dataTypes);
+    plan.setExpression(expression);
+    plan.setEnableRedirect(true);
+    queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+    RemoteQueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithValueFilter(context);
+      assertNull(dataSet.getEndPoint());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
+
+  @Test
+  public void testFilterWithTimeFilterRedirect()
+      throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
+    IExpression expression =
+        new GlobalTimeExpression(new AndFilter(TimeFilter.gtEq(5), TimeFilter.ltEq(10)));
+    RawDataQueryPlan plan = new RawDataQueryPlan();
+    plan.setDeduplicatedPaths(pathList.subList(0, 1));
+    plan.setDeduplicatedDataTypes(dataTypes.subList(0, 1));
+    plan.setExpression(expression);
+    plan.setEnableRedirect(true);
+    queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+    RemoteQueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+      assertEquals("ip:port=0.0.0.0:6667", dataSet.getEndPoint().toString());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
index 417b849..402400d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
@@ -62,7 +62,7 @@ public class ClusterTimeGeneratorTest extends BaseQueryTest {
       dataQueryPlan.addDeduplicatedPaths(new PartialPath(TestUtils.getTestSeries(1, 1)));
 
       ClusterTimeGenerator timeGenerator =
-          new ClusterTimeGenerator(expression, context, testMetaMember, dataQueryPlan);
+          new ClusterTimeGenerator(expression, context, testMetaMember, dataQueryPlan, false);
       for (int i = 3; i <= 8; i++) {
         assertTrue(timeGenerator.hasNext());
         assertEquals(i, timeGenerator.next());
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 20387de..e8da3f2 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -36,19 +36,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+@SuppressWarnings("squid:S106")
 public class SessionExample {
 
   private static Session session;
+  private static Session sessionEnableRedirect;
   private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
   private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
   private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
   private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
   private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
   private static final String ROOT_SG1_D1 = "root.sg1.d1";
+  private static final String LOCAL_HOST = "127.0.0.1";
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException {
-    session = new Session("127.0.0.1", 6667, "root", "root");
+    session = new Session(LOCAL_HOST, 6667, "root", "root");
     session.open(false);
 
     // set session fetchSize
@@ -77,6 +80,17 @@ public class SessionExample {
     deleteTimeseries();
     setTimeout();
     session.close();
+
+    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+    sessionEnableRedirect.setEnableQueryRedirection(true);
+    sessionEnableRedirect.open(false);
+
+    // set session fetchSize
+    sessionEnableRedirect.setFetchSize(10000);
+
+    insertRecord4Redirect();
+    query4Redirect();
+    sessionEnableRedirect.close();
   }
 
   private static void createTimeseries()
@@ -193,6 +207,31 @@ public class SessionExample {
     }
   }
 
+  private static void insertRecord4Redirect()
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < 6; i++) {
+      for (int j = 0; j < 2; j++) {
+        String deviceId = "root.redirect" + i + ".d" + j;
+        List<String> measurements = new ArrayList<>();
+        measurements.add("s1");
+        measurements.add("s2");
+        measurements.add("s3");
+        List<TSDataType> types = new ArrayList<>();
+        types.add(TSDataType.INT64);
+        types.add(TSDataType.INT64);
+        types.add(TSDataType.INT64);
+
+        for (long time = 0; time < 5; time++) {
+          List<Object> values = new ArrayList<>();
+          values.add(1L + time);
+          values.add(2L + time);
+          values.add(3L + time);
+          session.insertRecord(deviceId, time, measurements, types, values);
+        }
+      }
+    }
+  }
+
   private static void insertStrRecord()
       throws IoTDBConnectionException, StatementExecutionException {
     String deviceId = ROOT_SG1_D1;
@@ -448,6 +487,65 @@ public class SessionExample {
     dataSet.closeOperationHandle();
   }
 
+  private static void query4Redirect()
+      throws IoTDBConnectionException, StatementExecutionException {
+    String selectPrefix = "select * from root.redirect";
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement(
+              selectPrefix + i + ".d1 where time >= 1 and time < 10");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement(
+              selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement(
+              selectPrefix
+                  + i
+                  + ".d1 where time >= 1 and time < 10 and root.redirect"
+                  + i
+                  + "d1.s1 > 1");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+  }
+
   private static void queryWithTimeout()
       throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1", 2000);
@@ -526,7 +624,7 @@ public class SessionExample {
   }
 
   private static void setTimeout() throws StatementExecutionException {
-    Session tempSession = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
+    Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000);
     tempSession.setTimeout(60000);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index bcd5b05..a188db5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -42,6 +42,8 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   private Map<String, Integer> pathToIndex = new HashMap<>();
 
+  private boolean enableRedirect = false;
+
   public QueryPlan() {
     super(true);
     setOperatorType(Operator.OperatorType.QUERY);
@@ -126,4 +128,12 @@ public abstract class QueryPlan extends PhysicalPlan {
       throws IllegalPathException {
     return columnForReader;
   }
+
+  public boolean isEnableRedirect() {
+    return enableRedirect;
+  }
+
+  public void setEnableRedirect(boolean enableRedirect) {
+    this.enableRedirect = enableRedirect;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index f8ce3ea..b37a16e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Field;
@@ -108,6 +109,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         this.dataSetType = DataSetType.QUERY;
         this.rawDataQueryPlan = new RawDataQueryPlan();
         this.rawDataQueryPlan.setAscending(alignByDevicePlan.isAscending());
+        // only redirect query for raw data query
+        this.rawDataQueryPlan.setEnableRedirect(alignByDevicePlan.isEnableRedirect());
     }
 
     this.curDataSetInitialized = false;
@@ -198,6 +201,14 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         throw new IOException(e);
       }
 
+      if (currentDataSet.getEndPoint() != null) {
+        org.apache.iotdb.service.rpc.thrift.EndPoint endPoint =
+            new org.apache.iotdb.service.rpc.thrift.EndPoint();
+        endPoint.setIp(currentDataSet.getEndPoint().getIp());
+        endPoint.setPort(currentDataSet.getEndPoint().getPort());
+        throw new RedirectException(endPoint);
+      }
+
       if (currentDataSet.hasNext()) {
         curDataSetInitialized = true;
         return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 63f08c1..c46fe1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -180,6 +180,17 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
     init();
   }
 
+  /**
+   * Dummy dataSet for redirect query.
+   *
+   * @param queryId queryId for the query.
+   */
+  public RawQueryDataSetWithoutValueFilter(long queryId) {
+    this.queryId = queryId;
+    blockingQueueArray = new BlockingQueue[0];
+    timeHeap = new TimeSelector(0, ascending);
+  }
+
   private void init() throws IOException, InterruptedException {
     timeHeap = new TimeSelector(seriesReaderList.size() << 1, ascending);
     for (int i = 0; i < seriesReaderList.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 042934a..fc75d0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -61,6 +61,10 @@ public class RawDataQueryExecutor {
   /** without filter or with global time filter. */
   public QueryDataSet executeWithoutValueFilter(QueryContext context)
       throws StorageEngineException, QueryProcessException {
+    QueryDataSet dataSet = needRedirect(context, false);
+    if (dataSet != null) {
+      return dataSet;
+    }
     List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
     try {
       return new RawQueryDataSetWithoutValueFilter(
@@ -79,6 +83,10 @@ public class RawDataQueryExecutor {
 
   public final QueryDataSet executeNonAlign(QueryContext context)
       throws StorageEngineException, QueryProcessException {
+    QueryDataSet dataSet = needRedirect(context, false);
+    if (dataSet != null) {
+      return dataSet;
+    }
     List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
     return new NonAlignEngineDataSet(
         context.getQueryId(),
@@ -133,6 +141,11 @@ public class RawDataQueryExecutor {
    */
   public final QueryDataSet executeWithValueFilter(QueryContext context)
       throws StorageEngineException, QueryProcessException {
+    QueryDataSet dataSet = needRedirect(context, true);
+    if (dataSet != null) {
+      return dataSet;
+    }
+
     TimeGenerator timestampGenerator =
         getTimeGenerator(queryPlan.getExpression(), context, queryPlan);
     List<Boolean> cached =
@@ -196,4 +209,16 @@ public class RawDataQueryExecutor {
       throws StorageEngineException {
     return new ServerTimeGenerator(expression, context, queryPlan);
   }
+
+  /**
+   * Check whether need to redirect query to other node.
+   *
+   * @param context query context
+   * @param hasValueFilter if has value filter, we need to check timegenerator
+   * @return dummyDataSet to avoid more cost, if null, no need
+   */
+  protected QueryDataSet needRedirect(QueryContext context, boolean hasValueFilter)
+      throws StorageEngineException, QueryProcessException {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6aac0c7..eb7571f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -83,8 +83,10 @@ import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -172,6 +174,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private static final String INFO_NOT_ALLOWED_IN_BATCH_ERROR =
       "The query statement is not allowed in batch: ";
 
+  private static final String INFO_INTERRUPT_ERROR =
+      "Current Thread interrupted when dealing with request {}";
+
   private static final int MAX_SIZE =
       IoTDBDescriptor.getInstance().getConfig().getQueryCacheSizeInMetric();
   private static final int DELETE_SIZE = 20;
@@ -531,8 +536,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()))
+              sessionIdUsernameMap.get(req.getSessionId()),
+              req.isEnableRedirectQuery())
           : executeUpdateStatement(physicalPlan, req.getSessionId());
+    } catch (InterruptedException e) {
+      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+      Thread.currentThread().interrupt();
+      return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
     } catch (Exception e) {
       return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
     }
@@ -557,9 +567,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()))
+              sessionIdUsernameMap.get(req.getSessionId()),
+              req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+    } catch (InterruptedException e) {
+      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+      Thread.currentThread().interrupt();
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "executing executeQueryStatement"));
     } catch (Exception e) {
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "executing executeQueryStatement"));
@@ -582,9 +598,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
               physicalPlan,
               req.fetchSize,
               config.getQueryTimeoutThreshold(),
-              sessionIdUsernameMap.get(req.getSessionId()))
+              sessionIdUsernameMap.get(req.getSessionId()),
+              req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+    } catch (InterruptedException e) {
+      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+      Thread.currentThread().interrupt();
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "executing executeRawDataQuery"));
     } catch (Exception e) {
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "executing executeRawDataQuery"));
@@ -595,14 +617,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
    *     some AuthorPlan
    */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
   private TSExecuteStatementResp internalExecuteQueryStatement(
       String statement,
       long statementId,
       PhysicalPlan plan,
       int fetchSize,
       long timeout,
-      String username)
+      String username,
+      boolean enableRedirect)
       throws QueryProcessException, SQLException, StorageEngineException,
           QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
           TException, AuthException {
@@ -644,8 +667,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
         resp = getQueryColumnHeaders(plan, username);
       }
+      if (plan instanceof QueryPlan) {
+        ((QueryPlan) plan).setEnableRedirect(enableRedirect);
+      }
       // create and cache dataset
       QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
+
+      if (newDataSet.getEndPoint() != null && enableRedirect) {
+        // redirect query
+        LOGGER.debug(
+            "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
+        TSStatus status = new TSStatus();
+        status.setRedirectNode(
+            new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
+        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+        resp.setStatus(status);
+        resp.setQueryId(queryId);
+        return resp;
+      }
+
       if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
         resp = getListDataSetHeaders(newDataSet);
       } else if (plan instanceof UDFPlan) {
@@ -662,7 +702,27 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (newDataSet instanceof DirectNonAlignDataSet) {
         resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
       } else {
-        resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username));
+        try {
+          TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+          resp.setQueryDataSet(tsQueryDataSet);
+        } catch (RedirectException e) {
+          LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
+          if (enableRedirect) {
+            // redirect query
+            TSStatus status = new TSStatus();
+            status.setRedirectNode(e.getEndPoint());
+            status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+            resp.setStatus(status);
+            resp.setQueryId(queryId);
+            return resp;
+          } else {
+            LOGGER.error(
+                "execute {} error, if session does not support redirect,"
+                    + " should not throw redirection exception.",
+                statement,
+                e);
+          }
+        }
       }
       resp.setQueryId(queryId);
 
@@ -976,6 +1036,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         queryTimeManager.unRegisterQuery(req.queryId);
         return resp;
       }
+    } catch (InterruptedException e) {
+      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+      Thread.currentThread().interrupt();
+      return RpcUtils.getTSFetchResultsResp(
+          onNPEOrUnexpectedException(
+              e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
     } catch (Exception e) {
       releaseQueryResourceNoExceptions(req.queryId);
       return RpcUtils.getTSFetchResultsResp(
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index e84a96a..15f3157 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.rpc;
 
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
+import java.io.IOException;
 import java.util.Map;
 
-public class RedirectException extends Exception {
+public class RedirectException extends IOException {
 
   private final EndPoint endPoint;
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index b873ade..1a2fa6f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -66,6 +66,8 @@ public class Session {
   protected static final TSProtocolVersion protocolVersion =
       TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
   public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:";
+  public static final String MSG_DONOT_ENABLE_REDIRECT =
+      "Query do not enable redirect," + " please confirm the session and server conf.";
   protected String username;
   protected String password;
   protected int fetchSize;
@@ -94,6 +96,8 @@ public class Session {
   protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
   private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>();
 
+  protected boolean enableQueryRedirection = false;
+
   public Session(String host, int rpcPort) {
     this(
         host,
@@ -254,9 +258,10 @@ public class Session {
     this.enableRPCCompression = enableRPCCompression;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
     metaSessionConnection = defaultSessionConnection;
     isClosed = false;
-    if (enableCacheLeader) {
+    if (enableCacheLeader || enableQueryRedirection) {
       deviceIdToEndpoint = new HashMap<>();
       endPointToSessionConnection = new HashMap<>();
       endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
@@ -452,7 +457,7 @@ public class Session {
    */
   public SessionDataSet executeQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
-    return defaultSessionConnection.executeQueryStatement(sql, timeout);
+    return executeStatementMayRedirect(sql, timeout);
   }
 
   /**
@@ -467,7 +472,42 @@ public class Session {
     if (timeoutInMs < 0) {
       throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
     }
-    return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+    return executeStatementMayRedirect(sql, timeoutInMs);
+  }
+
+  /**
+   * execute the query, may redirect query to other node.
+   *
+   * @param sql the query statement
+   * @param timeoutInMs time in ms
+   * @return data set
+   * @throws StatementExecutionException statement is not right
+   * @throws IoTDBConnectionException the network is not good
+   */
+  private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql);
+      return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        logger.debug(
+            "{} redirect query {} to {}",
+            defaultSessionConnection.getEndPoint(),
+            sql,
+            e.getEndPoint());
+        // retry
+        try {
+          return defaultSessionConnection.executeQueryStatement(sql, timeout);
+        } catch (RedirectException redirectException) {
+          logger.error("{} redirect twice", sql, redirectException);
+          throw new StatementExecutionException(sql + " redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   /**
@@ -484,16 +524,32 @@ public class Session {
    * query eg. select * from paths where time >= startTime and time < endTime time interval include
    * startTime and exclude endTime
    *
-   * @param paths
+   * @param paths series path
    * @param startTime included
    * @param endTime excluded
-   * @return
-   * @throws StatementExecutionException
-   * @throws IoTDBConnectionException
+   * @return data set
+   * @throws StatementExecutionException statement is not right
+   * @throws IoTDBConnectionException the network is not good
    */
   public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
       throws StatementExecutionException, IoTDBConnectionException {
-    return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+    try {
+      return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        logger.debug("redirect query {} to {}", paths, e.getEndPoint());
+        // retry
+        try {
+          return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+        } catch (RedirectException redirectException) {
+          logger.error("Redirect twice", redirectException);
+          throw new StatementExecutionException("Redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   /**
@@ -585,6 +641,29 @@ public class Session {
     }
   }
 
+  private void handleQueryRedirection(EndPoint endPoint) throws IoTDBConnectionException {
+    if (enableQueryRedirection) {
+      SessionConnection connection =
+          endPointToSessionConnection.computeIfAbsent(
+              endPoint,
+              k -> {
+                try {
+                  SessionConnection sessionConnection =
+                      constructSessionConnection(this, endPoint, zoneId);
+                  sessionConnection.setEnableRedirect(enableQueryRedirection);
+                  return sessionConnection;
+                } catch (IoTDBConnectionException ex) {
+                  tmp.set(ex);
+                  return null;
+                }
+              });
+      if (connection == null) {
+        throw new IoTDBConnectionException(tmp.get());
+      }
+      defaultSessionConnection = connection;
+    }
+  }
+
   /**
    * insert data in one row, if you want improve your performance, please use insertInBatch method
    * or insertBatch method
@@ -1454,4 +1533,12 @@ public class Session {
         throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
     }
   }
+
+  public boolean isEnableQueryRedirection() {
+    return enableQueryRedirection;
+  }
+
+  public void setEnableQueryRedirection(boolean enableQueryRedirection) {
+    this.enableQueryRedirection = enableQueryRedirection;
+  }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 7a43483..f2b6665 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -69,6 +69,7 @@ public class SessionConnection {
   private long statementId;
   private ZoneId zoneId;
   private EndPoint endPoint;
+  private boolean enableRedirect = false;
 
   // TestOnly
   public SessionConnection() {}
@@ -254,7 +255,11 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet dataSet = null;
     try {
-      dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+      try {
+        dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+      } catch (RedirectException e) {
+        throw new StatementExecutionException("need to redirect query, should not see this.", e);
+      }
       return dataSet.hasNext();
     } finally {
       if (dataSet != null) {
@@ -264,13 +269,15 @@ public class SessionConnection {
   }
 
   protected SessionDataSet executeQueryStatement(String sql, long timeout)
-      throws StatementExecutionException, IoTDBConnectionException {
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
     execReq.setFetchSize(session.fetchSize);
     execReq.setTimeout(timeout);
     TSExecuteStatementResp execResp;
     try {
+      execReq.setEnableRedirectQuery(enableRedirect);
       execResp = client.executeQueryStatement(execReq);
+      RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -304,6 +311,7 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
     try {
+      execReq.setEnableRedirectQuery(enableRedirect);
       TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
       RpcUtils.verifySuccess(execResp.getStatus());
     } catch (TException e) {
@@ -322,13 +330,15 @@ public class SessionConnection {
   }
 
   protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
-      throws StatementExecutionException, IoTDBConnectionException {
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSRawDataQueryReq execReq =
         new TSRawDataQueryReq(sessionId, paths, startTime, endTime, statementId);
     execReq.setFetchSize(session.fetchSize);
     TSExecuteStatementResp execResp;
     try {
+      execReq.setEnableRedirectQuery(enableRedirect);
       execResp = client.executeRawDataQuery(execReq);
+      RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -660,4 +670,20 @@ public class SessionConnection {
     }
     return flag;
   }
+
+  public boolean isEnableRedirect() {
+    return enableRedirect;
+  }
+
+  public void setEnableRedirect(boolean enableRedirect) {
+    this.enableRedirect = enableRedirect;
+  }
+
+  public EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
 }
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 83cb70c..94892fa 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -117,6 +117,8 @@ struct TSExecuteStatementReq {
   4: optional i32 fetchSize
 
   5: optional i64 timeout
+
+  6: optional bool enableRedirectQuery;
 }
 
 struct TSExecuteBatchStatementReq{
@@ -276,6 +278,7 @@ struct TSRawDataQueryReq {
   4: required i64 startTime
   5: required i64 endTime
   6: required i64 statementId
+  7: optional bool enableRedirectQuery;
 }
 
 struct TSCreateMultiTimeseriesReq {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index c68a0e0..453403e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -35,6 +35,46 @@ public abstract class QueryDataSet {
   protected int alreadyReturnedRowNum = 0;
   protected int fetchSize = 10000;
   protected boolean ascending;
+  /*
+   *  whether current data group has data for query.
+   *  If not null(must be in cluster mode),
+   *  we need to redirect the query to any data group which has some data to speed up query.
+   */
+  protected EndPoint endPoint = null;
+
+  /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */
+  public static class EndPoint {
+    private String ip = null;
+    private int port = 0;
+
+    public EndPoint(String ip, int port) {
+      this.ip = ip;
+      this.port = port;
+    }
+
+    public EndPoint() {}
+
+    public String getIp() {
+      return ip;
+    }
+
+    public void setIp(String ip) {
+      this.ip = ip;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    @Override
+    public String toString() {
+      return "ip:port=" + ip + ":" + port;
+    }
+  }
 
   public QueryDataSet() {}
 
@@ -119,4 +159,12 @@ public abstract class QueryDataSet {
   public boolean hasLimit() {
     return rowLimit > 0;
   }
+
+  public EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
 }