You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/03/18 05:25:57 UTC

[iotdb] 01/01: IOTDB-1241 support redirect query for cluster

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch support_redirect_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 68048a4ca1c979325b98a664ab3053d02dde3f57
Author: chaow <xu...@gmail.com>
AuthorDate: Thu Mar 18 11:54:49 2021 +0800

    IOTDB-1241 support redirect query for cluster
---
 .../resources/conf/iotdb-cluster.properties        |  2 +
 .../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  5 ++
 .../apache/iotdb/cluster/metadata/CMManager.java   |  7 +-
 .../cluster/query/ClusterDataQueryExecutor.java    | 65 ++++++++++++++-
 .../cluster/query/reader/ClusterReaderFactory.java | 36 +++++++-
 .../cluster/query/reader/ClusterTimeGenerator.java | 37 +++++++--
 .../cluster/query/reader/ManagedMergeReader.java   | 13 +++
 .../cluster/query/reader/MergedReaderByTime.java   | 10 +++
 .../query/ClusterDataQueryExecutorTest.java        | 85 +++++++++++++++++++
 .../main/java/org/apache/iotdb/SessionExample.java | 97 ++++++++++++++++++++++
 .../iotdb/db/qp/physical/crud/QueryPlan.java       | 10 +++
 .../db/query/dataset/AlignByDeviceDataSet.java     | 11 +++
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 11 +++
 .../db/query/executor/RawDataQueryExecutor.java    | 22 +++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 52 ++++++++++--
 .../org/apache/iotdb/rpc/RedirectException.java    |  3 +-
 .../java/org/apache/iotdb/session/Session.java     | 81 +++++++++++++++++-
 .../apache/iotdb/session/SessionConnection.java    | 32 ++++++-
 thrift/src/main/thrift/rpc.thrift                  |  3 +
 .../tsfile/read/query/dataset/QueryDataSet.java    | 40 +++++++++
 21 files changed, 603 insertions(+), 29 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 1627d43..87283a1 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -166,3 +166,5 @@ max_client_pernode_permember_number=1000
 # we need to wait so much time for other connections to be released until timeout,
 # or a new connection will be created.
 wait_client_timeout_ms=5000
+
+enable_query_redirect=false
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 08f7608..e8a3953 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -161,6 +161,8 @@ public class ClusterConfig {
 
   private boolean openServerRpcPort = false;
 
+  private boolean enableQueryRedirect = false;
+
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
@@ -456,4 +458,12 @@ public class ClusterConfig {
   public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
     this.waitClientTimeoutMS = waitClientTimeoutMS;
   }
+
+  public boolean isEnableQueryRedirect() {
+    return enableQueryRedirect;
+  }
+
+  public void setEnableQueryRedirect(boolean enableQueryRedirect) {
+    this.enableQueryRedirect = enableQueryRedirect;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d6bbf82..799a7a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -293,6 +293,11 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "wait_client_timeout_ms", String.valueOf(config.getWaitClientTimeoutMS()))));
 
+    config.setEnableQueryRedirect(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_query_redirect", String.valueOf(config.isEnableQueryRedirect()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 6320432..cd572b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1241,11 +1241,12 @@ public class CMManager extends MManager {
       try {
         Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
         logger.debug(
-            "{}: get matched paths of {} from {}, result {}",
+            "{}: get matched paths of {} from {}, result {} for {}",
             metaGroupMember.getName(),
             partitionGroup,
             node,
-            paths);
+            paths,
+            pathsToQuery);
         if (paths != null) {
           // query next group
           Set<PartialPath> partialPaths = new HashSet<>();
@@ -1397,7 +1398,7 @@ public class CMManager extends MManager {
   public Set<String> getAllDevices(List<String> paths) throws MetadataException {
     Set<String> results = new HashSet<>();
     for (String path : paths) {
-      getAllTimeseriesPath(new PartialPath(path)).stream()
+      getDevices(new PartialPath(path)).stream()
           .map(PartialPath::getFullPath)
           .forEach(results::add);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index fbf8963..d2e8737 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -23,12 +23,15 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
 import org.apache.iotdb.cluster.query.reader.ClusterTimeGenerator;
+import org.apache.iotdb.cluster.query.reader.ManagedMergeReader;
+import org.apache.iotdb.cluster.query.reader.MergedReaderByTime;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
 import org.apache.iotdb.db.query.executor.RawDataQueryExecutor;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -36,6 +39,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
 import org.slf4j.Logger;
@@ -51,6 +55,8 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
   private static final Logger logger = LoggerFactory.getLogger(ClusterDataQueryExecutor.class);
   private MetaGroupMember metaGroupMember;
   private ClusterReaderFactory readerFactory;
+  private QueryDataSet.EndPoint endPoint = null;
+  private boolean hasLocalReader = false;
 
   ClusterDataQueryExecutor(RawDataQueryPlan plan, MetaGroupMember metaGroupMember) {
     super(plan);
@@ -74,6 +80,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
     }
 
     List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+    hasLocalReader = false;
     for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
       PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
       TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
@@ -95,9 +102,22 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
       }
 
       readersOfSelectedSeries.add(reader);
+      if (((ManagedMergeReader) reader).getEndPoint() != null) {
+        endPoint = ((ManagedMergeReader) reader).getEndPoint();
+      } else {
+        hasLocalReader = true;
+      }
     }
     if (logger.isDebugEnabled()) {
-      logger.debug("Initialized {} readers for {}", readersOfSelectedSeries.size(), queryPlan);
+      logger.debug(
+          "Initialized {} readers for {} has localReader {}",
+          readersOfSelectedSeries.size(),
+          queryPlan,
+          hasLocalReader);
+    }
+    if (hasLocalReader) {
+      // no need to redirect query
+      endPoint = null;
     }
     return readersOfSelectedSeries;
   }
@@ -106,8 +126,15 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
   protected IReaderByTimestamp getReaderByTimestamp(
       PartialPath path, Set<String> deviceMeasurements, TSDataType dataType, QueryContext context)
       throws StorageEngineException, QueryProcessException {
-    return readerFactory.getReaderByTimestamp(
-        path, deviceMeasurements, dataType, context, queryPlan.isAscending());
+    IReaderByTimestamp iReaderByTimestamp =
+        readerFactory.getReaderByTimestamp(
+            path, deviceMeasurements, dataType, context, queryPlan.isAscending());
+    if (((MergedReaderByTime) iReaderByTimestamp).getEndPoint() == null) {
+      hasLocalReader = true;
+    } else if (!hasLocalReader && endPoint == null) {
+      endPoint = ((MergedReaderByTime) iReaderByTimestamp).getEndPoint();
+    }
+    return iReaderByTimestamp;
   }
 
   @Override
@@ -116,4 +143,36 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
       throws StorageEngineException {
     return new ClusterTimeGenerator(queryExpression, context, metaGroupMember, rawDataQueryPlan);
   }
+
+  public QueryDataSet.EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
+
+  @Override
+  protected QueryDataSet needRedirect(long queryId, TimeGenerator timeGenerator) {
+    ClusterTimeGenerator clusterTimeGenerator = (ClusterTimeGenerator) timeGenerator;
+    logger.debug(
+        "redirect queryId {}, {}, {}, {}, {}",
+        queryId,
+        hasLocalReader,
+        queryPlan.getOperatorType(),
+        endPoint,
+        clusterTimeGenerator);
+    if (hasLocalReader
+        || !queryPlan.isEnableRedirect()
+        || (clusterTimeGenerator != null && clusterTimeGenerator.isHasLocalReader())) {
+      endPoint = null;
+    }
+    if (!hasLocalReader && endPoint != null && queryPlan.isEnableRedirect()) {
+      // dummy dataSet
+      QueryDataSet dataSet = new RawQueryDataSetWithoutValueFilter(queryId);
+      dataSet.setEndPoint(endPoint);
+      return dataSet;
+    }
+    return null;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 91819b1..a68881a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
@@ -119,14 +120,29 @@ public class ClusterReaderFactory {
         path,
         partitionGroups.size());
     List<IReaderByTimestamp> readers = new ArrayList<>(partitionGroups.size());
+    QueryDataSet.EndPoint endPoint = new QueryDataSet.EndPoint();
+    boolean hasLocalReader = false;
     for (PartitionGroup partitionGroup : partitionGroups) {
       // query each group to get a reader in that group
-      readers.add(
+      IReaderByTimestamp iReaderByTimestamp =
           getSeriesReaderByTime(
-              partitionGroup, path, deviceMeasurements, context, dataType, ascending));
+              partitionGroup, path, deviceMeasurements, context, dataType, ascending);
+      readers.add(iReaderByTimestamp);
+      if (iReaderByTimestamp instanceof SeriesReaderByTimestamp) {
+        hasLocalReader = true;
+      } else if (iReaderByTimestamp instanceof RemoteSeriesReaderByTimestamp) {
+        endPoint.setIp(partitionGroup.getHeader().getClientIp());
+        endPoint.setPort(partitionGroup.getHeader().getClientPort());
+      }
+    }
+    if (hasLocalReader) {
+      // no need redirect query to the endpoint
+      endPoint = null;
     }
     // merge the readers
-    return new MergedReaderByTime(readers);
+    MergedReaderByTime mergedReaderByTime = new MergedReaderByTime(readers);
+    mergedReaderByTime.setEndPoint(endPoint);
+    return mergedReaderByTime;
   }
 
   /**
@@ -227,6 +243,8 @@ public class ClusterReaderFactory {
         path,
         partitionGroups.size());
     ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+    QueryDataSet.EndPoint endPoint = new QueryDataSet.EndPoint();
+    boolean hasLocalReader = false;
     try {
       // build a reader for each group and merge them
       for (PartitionGroup partitionGroup : partitionGroups) {
@@ -240,11 +258,23 @@ public class ClusterReaderFactory {
                 context,
                 dataType,
                 ascending);
+        if (seriesReader instanceof SeriesRawDataPointReader
+            && seriesReader.hasNextTimeValuePair()) {
+          hasLocalReader = true;
+        } else if (seriesReader instanceof RemoteSimpleSeriesReader) {
+          endPoint.setIp(partitionGroup.getHeader().getClientIp());
+          endPoint.setPort(partitionGroup.getHeader().getClientPort());
+        }
         mergeReader.addReader(seriesReader, 0);
       }
     } catch (IOException | QueryProcessException e) {
       throw new StorageEngineException(e);
     }
+    if (hasLocalReader) {
+      // no need redirect query to the endpoint
+      endPoint = null;
+    }
+    mergeReader.setEndPoint(endPoint);
     return mergeReader;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
index b7b5f51..8f88a23 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -40,6 +41,7 @@ import java.util.Collections;
 public class ClusterTimeGenerator extends ServerTimeGenerator {
 
   private ClusterReaderFactory readerFactory;
+  private boolean hasLocalReader = false;
 
   /** Constructor of EngineTimeGenerator. */
   public ClusterTimeGenerator(
@@ -65,22 +67,41 @@ public class ClusterTimeGenerator extends ServerTimeGenerator {
     Filter filter = expression.getFilter();
     PartialPath path = (PartialPath) expression.getSeriesPath();
     TSDataType dataType;
+    ManagedSeriesReader mergeReader = null;
     try {
       dataType =
           ((CMManager) IoTDB.metaManager)
               .getSeriesTypesByPaths(Collections.singletonList(path), null)
               .left
               .get(0);
-      return readerFactory.getSeriesReader(
-          path,
-          queryPlan.getAllMeasurementsInDevice(path.getDevice()),
-          dataType,
-          null,
-          filter,
-          context,
-          queryPlan.isAscending());
+      mergeReader =
+          readerFactory.getSeriesReader(
+              path,
+              queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+              dataType,
+              null,
+              filter,
+              context,
+              queryPlan.isAscending());
     } catch (Exception e) {
       throw new IOException(e);
     }
+    if (((ManagedMergeReader) mergeReader).getEndPoint() == null) {
+      hasLocalReader = true;
+    }
+    return mergeReader;
+  }
+
+  public boolean isHasLocalReader() {
+    return hasLocalReader;
+  }
+
+  public void setHasLocalReader(boolean hasLocalReader) {
+    this.hasLocalReader = hasLocalReader;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", has local reader:" + hasLocalReader;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
index e54dede..acaffe4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
@@ -38,6 +39,10 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
 
   private BatchData batchData;
   private TSDataType dataType;
+  /*
+   * whether need to redirect node to the node
+   */
+  private QueryDataSet.EndPoint endPoint = null;
 
   public ManagedMergeReader(TSDataType dataType) {
     this.dataType = dataType;
@@ -91,4 +96,12 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
     batchData = null;
     return ret;
   }
+
+  public QueryDataSet.EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
index 2db6a09..ae215f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.query.reader;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 import java.util.List;
@@ -27,6 +28,7 @@ import java.util.List;
 public class MergedReaderByTime implements IReaderByTimestamp {
 
   private List<IReaderByTimestamp> innerReaders;
+  private QueryDataSet.EndPoint endPoint = null;
 
   public MergedReaderByTime(List<IReaderByTimestamp> innerReaders) {
     this.innerReaders = innerReaders;
@@ -44,4 +46,12 @@ public class MergedReaderByTime implements IReaderByTimestamp {
     }
     return null;
   }
+
+  public QueryDataSet.EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index aa48624..bee1d3a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.query;
 
 import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -27,18 +28,40 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 public class ClusterDataQueryExecutorTest extends BaseQueryTest {
 
   private ClusterDataQueryExecutor queryExecutor;
 
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(true);
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(false);
+  }
+
   @Test
   public void testNoFilter() throws IOException, StorageEngineException, QueryProcessException {
     RawDataQueryPlan plan = new RawDataQueryPlan();
@@ -75,4 +98,66 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
       QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
+
+  @Test
+  public void testNoFilterWithRedirect()
+      throws IOException, StorageEngineException, QueryProcessException {
+    RawDataQueryPlan plan = new RawDataQueryPlan();
+    plan.setDeduplicatedPaths(pathList);
+    plan.setDeduplicatedDataTypes(dataTypes);
+    plan.setEnableRedirect(true);
+    queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+    RemoteQueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+      System.out.println(dataSet.getEndPoint());
+      assertNull(dataSet.getEndPoint());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
+
+  @Test
+  public void testFilterWithValueFilterRedirect()
+      throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
+    IExpression expression =
+        new SingleSeriesExpression(
+            new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0));
+    RawDataQueryPlan plan = new RawDataQueryPlan();
+    plan.setDeduplicatedPaths(pathList);
+    plan.setDeduplicatedDataTypes(dataTypes);
+    plan.setExpression(expression);
+    plan.setEnableRedirect(true);
+    queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+    RemoteQueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithValueFilter(context);
+      assertNull(dataSet.getEndPoint());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
+
+  @Test
+  public void testFilterWithTimeFilterRedirect()
+      throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
+    IExpression expression =
+        new GlobalTimeExpression(new AndFilter(TimeFilter.gtEq(5), TimeFilter.ltEq(10)));
+    RawDataQueryPlan plan = new RawDataQueryPlan();
+    plan.setDeduplicatedPaths(pathList.subList(0, 1));
+    plan.setDeduplicatedDataTypes(dataTypes.subList(0, 1));
+    plan.setExpression(expression);
+    plan.setEnableRedirect(true);
+    queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+    RemoteQueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+      assertEquals("ip:port=0.0.0.0:6667", dataSet.getEndPoint().toString());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
 }
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 20387de..9868e26 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -39,6 +39,7 @@ import java.util.Random;
 public class SessionExample {
 
   private static Session session;
+  private static Session sessionEnableRedirect;
   private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
   private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
   private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
@@ -77,6 +78,17 @@ public class SessionExample {
     deleteTimeseries();
     setTimeout();
     session.close();
+
+    sessionEnableRedirect = new Session("127.0.0.1", 6667, "root", "root");
+    sessionEnableRedirect.setEnableQueryRedirection(true);
+    sessionEnableRedirect.open(false);
+
+    // set session fetchSize
+    sessionEnableRedirect.setFetchSize(10000);
+
+    insertRecord4Redirect();
+    query4Redirect();
+    sessionEnableRedirect.close();
   }
 
   private static void createTimeseries()
@@ -193,6 +205,31 @@ public class SessionExample {
     }
   }
 
+  private static void insertRecord4Redirect()
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < 6; i++) {
+      for (int j = 0; j < 2; j++) {
+        String deviceId = "root.redirect" + i + ".d" + j;
+        List<String> measurements = new ArrayList<>();
+        List<TSDataType> types = new ArrayList<>();
+        measurements.add("s1");
+        measurements.add("s2");
+        measurements.add("s3");
+        types.add(TSDataType.INT64);
+        types.add(TSDataType.INT64);
+        types.add(TSDataType.INT64);
+
+        for (long time = 0; time < 5; time++) {
+          List<Object> values = new ArrayList<>();
+          values.add(1L + time);
+          values.add(2L + time);
+          values.add(3L + time);
+          session.insertRecord(deviceId, time, measurements, types, values);
+        }
+      }
+    }
+  }
+
   private static void insertStrRecord()
       throws IoTDBConnectionException, StatementExecutionException {
     String deviceId = ROOT_SG1_D1;
@@ -448,6 +485,66 @@ public class SessionExample {
     dataSet.closeOperationHandle();
   }
 
+  private static void query4Redirect()
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement("select * from root.redirect" + i + ".d1");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement(
+              "select * from root.redirect" + i + ".d1 where time >= 1 and time < 10");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement(
+              "select * from root.redirect"
+                  + i
+                  + ".d1 where time >= 1 and time < 10 align by device");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+
+    for (int i = 0; i < 6; i++) {
+      SessionDataSet dataSet =
+          sessionEnableRedirect.executeQueryStatement(
+              "select * from root.redirect"
+                  + i
+                  + ".d1 where time >= 1 and time < 10 and root.redirect"
+                  + i
+                  + "d1.s1 > 1");
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024); // default is 10000
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
+      }
+
+      dataSet.closeOperationHandle();
+    }
+  }
+
   private static void queryWithTimeout()
       throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1", 2000);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index bcd5b05..a188db5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -42,6 +42,8 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   private Map<String, Integer> pathToIndex = new HashMap<>();
 
+  private boolean enableRedirect = false;
+
   public QueryPlan() {
     super(true);
     setOperatorType(Operator.OperatorType.QUERY);
@@ -126,4 +128,12 @@ public abstract class QueryPlan extends PhysicalPlan {
       throws IllegalPathException {
     return columnForReader;
   }
+
+  public boolean isEnableRedirect() {
+    return enableRedirect;
+  }
+
+  public void setEnableRedirect(boolean enableRedirect) {
+    this.enableRedirect = enableRedirect;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index f8ce3ea..b37a16e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Field;
@@ -108,6 +109,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         this.dataSetType = DataSetType.QUERY;
         this.rawDataQueryPlan = new RawDataQueryPlan();
         this.rawDataQueryPlan.setAscending(alignByDevicePlan.isAscending());
+        // only redirect query for raw data query
+        this.rawDataQueryPlan.setEnableRedirect(alignByDevicePlan.isEnableRedirect());
     }
 
     this.curDataSetInitialized = false;
@@ -198,6 +201,14 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         throw new IOException(e);
       }
 
+      if (currentDataSet.getEndPoint() != null) {
+        org.apache.iotdb.service.rpc.thrift.EndPoint endPoint =
+            new org.apache.iotdb.service.rpc.thrift.EndPoint();
+        endPoint.setIp(currentDataSet.getEndPoint().getIp());
+        endPoint.setPort(currentDataSet.getEndPoint().getPort());
+        throw new RedirectException(endPoint);
+      }
+
       if (currentDataSet.hasNext()) {
         curDataSetInitialized = true;
         return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 63f08c1..a707063 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -180,6 +180,17 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
     init();
   }
 
+  /**
+   * dummy dataSet for redirect query
+   *
+   * @param queryId queryId for the query
+   */
+  public RawQueryDataSetWithoutValueFilter(long queryId) {
+    this.queryId = queryId;
+    blockingQueueArray = new BlockingQueue[0];
+    timeHeap = new TimeSelector(0, ascending);
+  }
+
   private void init() throws IOException, InterruptedException {
     timeHeap = new TimeSelector(seriesReaderList.size() << 1, ascending);
     for (int i = 0; i < seriesReaderList.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 042934a..cb76e20 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -62,6 +62,10 @@ public class RawDataQueryExecutor {
   public QueryDataSet executeWithoutValueFilter(QueryContext context)
       throws StorageEngineException, QueryProcessException {
     List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
+    QueryDataSet dataSet = needRedirect(context.getQueryId(), null);
+    if (dataSet != null) {
+      return dataSet;
+    }
     try {
       return new RawQueryDataSetWithoutValueFilter(
           context.getQueryId(),
@@ -80,6 +84,10 @@ public class RawDataQueryExecutor {
   public final QueryDataSet executeNonAlign(QueryContext context)
       throws StorageEngineException, QueryProcessException {
     List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
+    QueryDataSet dataSet = needRedirect(context.getQueryId(), null);
+    if (dataSet != null) {
+      return dataSet;
+    }
     return new NonAlignEngineDataSet(
         context.getQueryId(),
         queryPlan.getDeduplicatedPaths(),
@@ -142,6 +150,10 @@ public class RawDataQueryExecutor {
             timestampGenerator.hasOrNode());
     List<IReaderByTimestamp> readersOfSelectedSeries =
         initSeriesReaderByTimestamp(context, queryPlan, cached);
+    QueryDataSet dataSet = needRedirect(context.getQueryId(), timestampGenerator);
+    if (dataSet != null) {
+      return dataSet;
+    }
     return new RawQueryDataSetWithValueFilter(
         queryPlan.getDeduplicatedPaths(),
         queryPlan.getDeduplicatedDataTypes(),
@@ -196,4 +208,14 @@ public class RawDataQueryExecutor {
       throws StorageEngineException {
     return new ServerTimeGenerator(expression, context, queryPlan);
   }
+
+  /**
+   * check whether need to redirect query to other node
+   *
+   * @param queryId queryId to cancel query
+   * @return dummyDataSet to avoid more cost, if null, no need
+   */
+  protected QueryDataSet needRedirect(long queryId, TimeGenerator timeGenerator) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 031b2bc..95119c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -83,8 +83,10 @@ import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -531,7 +533,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()))
+              sessionIdUsernameMap.get(req.getSessionId()),
+              req.isEnableRedirectQuery())
           : executeUpdateStatement(physicalPlan, req.getSessionId());
     } catch (Exception e) {
       return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
@@ -557,7 +560,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()))
+              sessionIdUsernameMap.get(req.getSessionId()),
+              req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
     } catch (Exception e) {
@@ -582,7 +586,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
               physicalPlan,
               req.fetchSize,
               config.getQueryTimeoutThreshold(),
-              sessionIdUsernameMap.get(req.getSessionId()))
+              sessionIdUsernameMap.get(req.getSessionId()),
+              req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
     } catch (Exception e) {
@@ -602,7 +607,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       PhysicalPlan plan,
       int fetchSize,
       long timeout,
-      String username)
+      String username,
+      boolean enableRedirect)
       throws QueryProcessException, SQLException, StorageEngineException,
           QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
           TException, AuthException {
@@ -644,8 +650,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
         resp = getQueryColumnHeaders(plan, username);
       }
+      if (plan instanceof QueryPlan) {
+        ((QueryPlan) plan).setEnableRedirect(enableRedirect);
+      }
       // create and cache dataset
       QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
+
+      if (newDataSet.getEndPoint() != null && enableRedirect) {
+        // redirect query
+        LOGGER.debug(
+            "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
+        TSStatus status = new TSStatus();
+        status.setRedirectNode(
+            new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
+        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+        resp.setStatus(status);
+        resp.setQueryId(queryId);
+        return resp;
+      }
+
       if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
         resp = getListDataSetHeaders(newDataSet);
       } else if (plan instanceof UDFPlan) {
@@ -662,7 +685,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (newDataSet instanceof DirectNonAlignDataSet) {
         resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
       } else {
-        resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username));
+        try {
+          TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+          resp.setQueryDataSet(tsQueryDataSet);
+        } catch (RedirectException e) {
+          LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
+          if (enableRedirect) {
+            // redirect query
+            TSStatus status = new TSStatus();
+            status.setRedirectNode(e.getEndPoint());
+            status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+            resp.setStatus(status);
+            resp.setQueryId(queryId);
+            return resp;
+          } else {
+            LOGGER.error(
+                "execute {} error, if session not support redirect, should not throw redirection exception",
+                statement,
+                e);
+          }
+        }
       }
       resp.setQueryId(queryId);
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index e84a96a..15f3157 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.rpc;
 
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
+import java.io.IOException;
 import java.util.Map;
 
-public class RedirectException extends Exception {
+public class RedirectException extends IOException {
 
   private final EndPoint endPoint;
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index b873ade..c3db5e4 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -94,6 +94,8 @@ public class Session {
   protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
   private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>();
 
+  protected boolean enableQueryRedirection = false;
+
   public Session(String host, int rpcPort) {
     this(
         host,
@@ -254,6 +256,7 @@ public class Session {
     this.enableRPCCompression = enableRPCCompression;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
     metaSessionConnection = defaultSessionConnection;
     isClosed = false;
     if (enableCacheLeader) {
@@ -452,7 +455,29 @@ public class Session {
    */
   public SessionDataSet executeQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
-    return defaultSessionConnection.executeQueryStatement(sql, timeout);
+    try {
+      logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql);
+      return defaultSessionConnection.executeQueryStatement(sql, timeout);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        logger.debug(
+            "{} redirect query {} to {}",
+            defaultSessionConnection.getEndPoint(),
+            sql,
+            e.getEndPoint());
+        // retry
+        try {
+          return defaultSessionConnection.executeQueryStatement(sql, timeout);
+        } catch (RedirectException redirectException) {
+          logger.error("{} redirect twice", sql, redirectException);
+          throw new StatementExecutionException(sql + " redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(
+            "raw data query do not support redirect, please confirm the session and server conf.");
+      }
+    }
   }
 
   /**
@@ -467,7 +492,24 @@ public class Session {
     if (timeoutInMs < 0) {
       throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
     }
-    return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+    try {
+      return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        logger.debug("redirect query {} to {}", sql, e.getEndPoint());
+        // retry
+        try {
+          return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+        } catch (RedirectException redirectException) {
+          logger.error("{} redirect twice", sql, redirectException);
+          throw new StatementExecutionException(sql + " redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(
+            "raw data query do not support redirect, please confirm the session and server conf.");
+      }
+    }
   }
 
   /**
@@ -493,7 +535,24 @@ public class Session {
    */
   public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
       throws StatementExecutionException, IoTDBConnectionException {
-    return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+    try {
+      return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        logger.debug("redirect query {} to {}", paths, e.getEndPoint());
+        // retry
+        try {
+          return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+        } catch (RedirectException redirectException) {
+          logger.error("Redirect twice", redirectException);
+          throw new StatementExecutionException("Redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(
+            "raw data query do not support redirect, please confirm the session and server conf.");
+      }
+    }
   }
 
   /**
@@ -585,6 +644,14 @@ public class Session {
     }
   }
 
+  private void handleQueryRedirection(EndPoint endPoint) throws IoTDBConnectionException {
+    if (enableQueryRedirection) {
+      defaultSessionConnection.close();
+      defaultSessionConnection = constructSessionConnection(this, endPoint, zoneId);
+      defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    }
+  }
+
   /**
    * insert data in one row, if you want improve your performance, please use insertInBatch method
    * or insertBatch method
@@ -1454,4 +1521,12 @@ public class Session {
         throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
     }
   }
+
+  public boolean isEnableQueryRedirection() {
+    return enableQueryRedirection;
+  }
+
+  public void setEnableQueryRedirection(boolean enableQueryRedirection) {
+    this.enableQueryRedirection = enableQueryRedirection;
+  }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 7a43483..f2b6665 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -69,6 +69,7 @@ public class SessionConnection {
   private long statementId;
   private ZoneId zoneId;
   private EndPoint endPoint;
+  private boolean enableRedirect = false;
 
   // TestOnly
   public SessionConnection() {}
@@ -254,7 +255,11 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet dataSet = null;
     try {
-      dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+      try {
+        dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+      } catch (RedirectException e) {
+        throw new StatementExecutionException("need to redirect query, should not see this.", e);
+      }
       return dataSet.hasNext();
     } finally {
       if (dataSet != null) {
@@ -264,13 +269,15 @@ public class SessionConnection {
   }
 
   protected SessionDataSet executeQueryStatement(String sql, long timeout)
-      throws StatementExecutionException, IoTDBConnectionException {
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
     execReq.setFetchSize(session.fetchSize);
     execReq.setTimeout(timeout);
     TSExecuteStatementResp execResp;
     try {
+      execReq.setEnableRedirectQuery(enableRedirect);
       execResp = client.executeQueryStatement(execReq);
+      RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -304,6 +311,7 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
     try {
+      execReq.setEnableRedirectQuery(enableRedirect);
       TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
       RpcUtils.verifySuccess(execResp.getStatus());
     } catch (TException e) {
@@ -322,13 +330,15 @@ public class SessionConnection {
   }
 
   protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
-      throws StatementExecutionException, IoTDBConnectionException {
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSRawDataQueryReq execReq =
         new TSRawDataQueryReq(sessionId, paths, startTime, endTime, statementId);
     execReq.setFetchSize(session.fetchSize);
     TSExecuteStatementResp execResp;
     try {
+      execReq.setEnableRedirectQuery(enableRedirect);
       execResp = client.executeRawDataQuery(execReq);
+      RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -660,4 +670,20 @@ public class SessionConnection {
     }
     return flag;
   }
+
+  public boolean isEnableRedirect() {
+    return enableRedirect;
+  }
+
+  public void setEnableRedirect(boolean enableRedirect) {
+    this.enableRedirect = enableRedirect;
+  }
+
+  public EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
 }
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 83cb70c..94892fa 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -117,6 +117,8 @@ struct TSExecuteStatementReq {
   4: optional i32 fetchSize
 
   5: optional i64 timeout
+
+  6: optional bool enableRedirectQuery;
 }
 
 struct TSExecuteBatchStatementReq{
@@ -276,6 +278,7 @@ struct TSRawDataQueryReq {
   4: required i64 startTime
   5: required i64 endTime
   6: required i64 statementId
+  7: optional bool enableRedirectQuery;
 }
 
 struct TSCreateMultiTimeseriesReq {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index eb7a206..4f25739 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -34,6 +34,38 @@ public abstract class QueryDataSet {
   protected int rowOffset = 0;
   protected int alreadyReturnedRowNum = 0;
   protected boolean ascending;
+  /*
+   *  whether current data group has data for query.
+   *  If not null(must be in cluster mode), we need to redirect the query to any data group which has some data to speed up query.
+   */
+  protected EndPoint endPoint = null;
+
+  /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */
+  public static class EndPoint {
+    private String ip = null;
+    private int port = 0;
+
+    public String getIp() {
+      return ip;
+    }
+
+    public void setIp(String ip) {
+      this.ip = ip;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    @Override
+    public String toString() {
+      return "ip:port=" + ip + ":" + port;
+    }
+  }
 
   public QueryDataSet() {}
 
@@ -114,4 +146,12 @@ public abstract class QueryDataSet {
   public boolean hasLimit() {
     return rowLimit > 0;
   }
+
+  public EndPoint getEndPoint() {
+    return endPoint;
+  }
+
+  public void setEndPoint(EndPoint endPoint) {
+    this.endPoint = endPoint;
+  }
 }