You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/19 09:15:07 UTC

[iotdb] branch master updated: Optimize cluster query (#2859)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c9026e2  Optimize cluster query (#2859)
c9026e2 is described below

commit c9026e2f29fe18c08fe9d7ad3fbb541dd14f5913
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Mar 19 17:14:39 2021 +0800

    Optimize cluster query (#2859)
---
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  15 +-
 .../iotdb/cluster/query/reader/EmptyReader.java    |   3 +-
 .../cluster/query/reader/MergedReaderByTime.java   |   8 +-
 .../reader/RemoteSeriesReaderByTimestamp.java      |  40 +++--
 .../iotdb/cluster/server/DataClusterServer.java    |  13 +-
 .../cluster/server/service/DataAsyncService.java   |  10 +-
 .../cluster/server/service/DataSyncService.java    |   5 +-
 .../iotdb/cluster/common/TestAsyncDataClient.java  |   9 +-
 .../cluster/common/TestManagedSeriesReader.java    |  23 +--
 .../query/manage/ClusterQueryManagerTest.java      |   2 +-
 .../reader/RemoteSeriesReaderByTimestampTest.java  |  68 +++++---
 .../cluster/server/member/DataGroupMemberTest.java |  30 ++--
 .../cluster/server/member/MetaGroupMemberTest.java |   8 +-
 .../db/query/aggregation/AggregateResult.java      |   4 +
 .../db/query/aggregation/impl/AvgAggrResult.java   |  15 +-
 .../db/query/aggregation/impl/CountAggrResult.java |  18 ++-
 .../aggregation/impl/FirstValueAggrResult.java     |  26 +++-
 .../aggregation/impl/FirstValueDescAggrResult.java |  20 ++-
 .../aggregation/impl/LastValueAggrResult.java      |  26 ++--
 .../aggregation/impl/LastValueDescAggrResult.java  |  37 +++--
 .../query/aggregation/impl/MaxTimeAggrResult.java  |  20 ++-
 .../aggregation/impl/MaxTimeDescAggrResult.java    |  31 +++-
 .../query/aggregation/impl/MaxValueAggrResult.java |  18 ++-
 .../query/aggregation/impl/MinTimeAggrResult.java  |  24 ++-
 .../aggregation/impl/MinTimeDescAggrResult.java    |  17 +-
 .../query/aggregation/impl/MinValueAggrResult.java |  18 ++-
 .../db/query/aggregation/impl/SumAggrResult.java   |  15 +-
 .../dataset/RawQueryDataSetWithValueFilter.java    | 172 ++++++++++++++-------
 .../db/query/executor/AggregationExecutor.java     |  87 +++++++----
 .../adapter/ByTimestampReaderAdapter.java          |  48 +++---
 .../reader/chunk/DiskChunkReaderByTimestamp.java   |  55 +++----
 .../db/query/reader/series/IReaderByTimestamp.java |   2 +-
 .../reader/series/SeriesReaderByTimestamp.java     |  17 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   5 +-
 .../reader/series/SeriesReaderByTimestampTest.java |  12 +-
 thrift/src/main/thrift/cluster.thrift              |   4 +-
 .../query/dataset/DataSetWithTimeGenerator.java    |   2 +-
 .../tsfile/read/query/dataset/QueryDataSet.java    |   5 +
 .../read/query/timegenerator/TimeGenerator.java    |  52 +++++--
 .../read/query/timegenerator/node/AndNode.java     |  18 +--
 .../read/query/timegenerator/node/OrNode.java      |  42 ++---
 .../tsfile/read/reader/FakedTimeGenerator.java     |  15 +-
 42 files changed, 706 insertions(+), 353 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 028b3fc..24c7847 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -102,24 +102,19 @@ public class LocalQueryExecutor {
     return ((CMManager) IoTDB.metaManager);
   }
 
-  /**
-   * Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer".
-   *
-   * @param readerId
-   * @param time
-   */
-  public ByteBuffer fetchSingleSeriesByTimestamp(long readerId, long time)
+  /** Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer". */
+  public ByteBuffer fetchSingleSeriesByTimestamps(long readerId, long[] timestamps, int length)
       throws ReaderNotFoundException, IOException {
     IReaderByTimestamp reader = dataGroupMember.getQueryManager().getReaderByTimestamp(readerId);
     if (reader == null) {
       throw new ReaderNotFoundException(readerId);
     }
-    Object value = reader.getValueInTimestamp(time);
-    if (value != null) {
+    Object[] values = reader.getValuesInTimestamps(timestamps, length);
+    if (values != null) {
       ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
 
-      SerializeUtils.serializeObject(value, dataOutputStream);
+      SerializeUtils.serializeObjects(values, dataOutputStream);
       return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
     } else {
       return ByteBuffer.allocate(0);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index 933a9ab..3b29a69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -155,7 +156,7 @@ public class EmptyReader extends BaseManagedSeriesReader
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
     return null;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
index 2db6a09..802355c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
@@ -33,12 +33,12 @@ public class MergedReaderByTime implements IReaderByTimestamp {
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
     for (IReaderByTimestamp innerReader : innerReaders) {
       if (innerReader != null) {
-        Object valueInTimestamp = innerReader.getValueInTimestamp(timestamp);
-        if (valueInTimestamp != null) {
-          return valueInTimestamp;
+        Object[] results = innerReader.getValuesInTimestamps(timestamps, length);
+        if (results != null) {
+          return results;
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 2b68ef4..d077f02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
@@ -49,37 +51,42 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
     if (!sourceInfo.checkCurClient()) {
       return null;
     }
 
     ByteBuffer result;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-      result = fetchResultAsync(timestamp);
+      result = fetchResultAsync(timestamps, length);
     } else {
-      result = fetchResultSync(timestamp);
+      result = fetchResultSync(timestamps, length);
     }
 
-    return SerializeUtils.deserializeObject(result);
+    return SerializeUtils.deserializeObjects(result);
   }
 
   @SuppressWarnings("java:S2274") // enable timeout
-  private ByteBuffer fetchResultAsync(long timestamp) throws IOException {
+  private ByteBuffer fetchResultAsync(long[] timestamps, int length) throws IOException {
+    // convert long[] to List<Long>, which is used for thrift
+    List<Long> timestampList = new ArrayList<>(length);
+    for (int i = 0; i < length; i++) {
+      timestampList.add(timestamps[i]);
+    }
     synchronized (fetchResult) {
       fetchResult.set(null);
       try {
         sourceInfo
             .getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
-            .fetchSingleSeriesByTimestamp(
-                sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp, handler);
+            .fetchSingleSeriesByTimestamps(
+                sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList, handler);
         fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
       } catch (TException e) {
         // try other node
-        if (!sourceInfo.switchNode(true, timestamp)) {
+        if (!sourceInfo.switchNode(true, timestamps[0])) {
           return null;
         }
-        return fetchResultAsync(timestamp);
+        return fetchResultAsync(timestamps, length);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.warn("Query {} interrupted", sourceInfo);
@@ -89,18 +96,23 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
     return fetchResult.get();
   }
 
-  private ByteBuffer fetchResultSync(long timestamp) throws IOException {
+  private ByteBuffer fetchResultSync(long[] timestamps, int length) throws IOException {
     SyncDataClient curSyncClient = null;
+    // convert long[] to List<Long>, which is used for thrift
+    List<Long> timestampList = new ArrayList<>(length);
+    for (int i = 0; i < length; i++) {
+      timestampList.add(timestamps[i]);
+    }
     try {
       curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
-      return curSyncClient.fetchSingleSeriesByTimestamp(
-          sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp);
+      return curSyncClient.fetchSingleSeriesByTimestamps(
+          sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList);
     } catch (TException e) {
       // try other node
-      if (!sourceInfo.switchNode(true, timestamp)) {
+      if (!sourceInfo.switchNode(true, timestamps[0])) {
         return null;
       }
-      return fetchResultSync(timestamp);
+      return fetchResultSync(timestamps, length);
     } finally {
       if (curSyncClient != null) {
         ClientUtils.putBackSyncClient(curSyncClient);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 012569e..03a78a9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -384,12 +384,15 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(
-      Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+  public void fetchSingleSeriesByTimestamps(
+      Node header,
+      long readerId,
+      List<Long> timestamps,
+      AsyncMethodCallback<ByteBuffer> resultHandler) {
     DataAsyncService service =
         getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
     if (service != null) {
-      service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
+      service.fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler);
     }
   }
 
@@ -737,9 +740,9 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
       throws TException {
-    return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+    return getDataSyncService(header).fetchSingleSeriesByTimestamps(header, readerId, timestamps);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 41a2689..99fe858 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -207,13 +207,17 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(
-      Node header, long readerId, long timestamp, AsyncMethodCallback<ByteBuffer> resultHandler) {
+  public void fetchSingleSeriesByTimestamps(
+      Node header,
+      long readerId,
+      List<Long> timestamps,
+      AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
       resultHandler.onComplete(
           dataGroupMember
               .getLocalQueryExecutor()
-              .fetchSingleSeriesByTimestamp(readerId, timestamp));
+              .fetchSingleSeriesByTimestamps(
+                  readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size()));
     } catch (ReaderNotFoundException | IOException e) {
       resultHandler.onError(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 555a0bc..b3c5610 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -210,12 +210,13 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
       throws TException {
     try {
       return dataGroupMember
           .getLocalQueryExecutor()
-          .fetchSingleSeriesByTimestamp(readerId, timestamp);
+          .fetchSingleSeriesByTimestamps(
+              readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size());
     } catch (ReaderNotFoundException | IOException e) {
       throw new TException(e);
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 661dfbf..5b2797a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -99,12 +99,15 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(
-      Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+  public void fetchSingleSeriesByTimestamps(
+      Node header,
+      long readerId,
+      List<Long> timestamps,
+      AsyncMethodCallback<ByteBuffer> resultHandler) {
     new Thread(
             () ->
                 new DataAsyncService(dataGroupMemberMap.get(header))
-                    .fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler))
+                    .fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler))
         .start();
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
index 5ac3f64..cbd9bb6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
@@ -57,17 +57,22 @@ public class TestManagedSeriesReader implements ManagedSeriesReader, IReaderByTi
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) {
-    while (batchData.hasCurrent()) {
-      long currTime = batchData.currentTime();
-      if (currTime == timestamp) {
-        return batchData.currentValue();
-      } else if (currTime > timestamp) {
-        break;
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) {
+    Object[] results = new Object[length];
+    for (int i = 0; i < length; i++) {
+      while (batchData.hasCurrent()) {
+        long currTime = batchData.currentTime();
+        if (currTime == timestamps[i]) {
+          results[i] = batchData.currentValue();
+          break;
+        } else if (currTime > timestamps[i]) {
+          results[i] = null;
+          break;
+        }
+        batchData.next();
       }
-      batchData.next();
     }
-    return null;
+    return results;
   }
 
   @Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
index 01a7b8b..35a404e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
@@ -79,7 +79,7 @@ public class ClusterQueryManagerTest {
 
   @Test
   public void testRegisterReaderByTime() {
-    IReaderByTimestamp reader = timestamp -> null;
+    IReaderByTimestamp reader = (timestamp, length) -> null;
     long id = queryManager.registerReaderByTime(reader);
     assertSame(reader, queryManager.getReaderByTimestamp(id));
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index a81dc66..ef2772e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -45,11 +45,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 public class RemoteSeriesReaderByTimestampTest {
@@ -68,10 +68,10 @@ public class RemoteSeriesReaderByTimestampTest {
           public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
             return new AsyncDataClient(null, null, node, null) {
               @Override
-              public void fetchSingleSeriesByTimestamp(
+              public void fetchSingleSeriesByTimestamps(
                   Node header,
                   long readerId,
-                  long time,
+                  List<Long> timestamps,
                   AsyncMethodCallback<ByteBuffer> resultHandler)
                   throws TException {
                 if (failedNodes.contains(node)) {
@@ -83,24 +83,24 @@ public class RemoteSeriesReaderByTimestampTest {
                           ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                           DataOutputStream dataOutputStream =
                               new DataOutputStream(byteArrayOutputStream);
-                          boolean isNull = true;
-                          while (batchData.hasCurrent()) {
-                            long currentTime = batchData.currentTime();
-                            Object value = batchData.currentValue();
-                            if (currentTime == time) {
-                              SerializeUtils.serializeObject(value, dataOutputStream);
+                          Object[] results = new Object[timestamps.size()];
+                          for (int i = 0; i < timestamps.size(); i++) {
+                            while (batchData.hasCurrent()) {
+                              long currentTime = batchData.currentTime();
+                              if (currentTime == timestamps.get(i)) {
+                                results[i] = batchData.currentValue();
+                                batchData.next();
+                                break;
+                              } else if (currentTime > timestamps.get(i)) {
+                                results[i] = null;
+                                break;
+                              }
+                              // time < timestamp, continue
                               batchData.next();
-                              isNull = false;
-                              break;
-                            } else if (currentTime > time) {
-                              break;
                             }
-                            // time < timestamp, continue
-                            batchData.next();
-                          }
-                          if (isNull) {
-                            SerializeUtils.serializeObject(null, dataOutputStream);
                           }
+                          SerializeUtils.serializeObjects(results, dataOutputStream);
+
                           resultHandler.onComplete(
                               ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
                         })
@@ -146,10 +146,16 @@ public class RemoteSeriesReaderByTimestampTest {
 
       RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
 
+      long[] times = new long[100];
       for (int i = 0; i < 100; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        times[i] = i;
       }
-      assertNull(reader.getValueInTimestamp(101));
+      Object[] results = reader.getValuesInTimestamps(times, times.length);
+      for (int i = 0; i < 100; i++) {
+        assertEquals(i * 1.0, results[i]);
+      }
+      times[0] = 101;
+      assertEquals(null, reader.getValuesInTimestamps(times, 1)[0]);
     } finally {
       QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
@@ -180,8 +186,13 @@ public class RemoteSeriesReaderByTimestampTest {
               + (endTime - startTime));
       // normal read
       assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
+      long[] times = new long[50];
+      for (int i = 0; i < 50; i++) {
+        times[i] = i;
+      }
+      Object[] results = reader.getValuesInTimestamps(times, 50);
       for (int i = 0; i < 50; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        assertEquals(i * 1.0, results[i]);
       }
 
       endTime = System.currentTimeMillis();
@@ -191,14 +202,22 @@ public class RemoteSeriesReaderByTimestampTest {
               + (endTime - startTime));
       failedNodes.add(TestUtils.getNode(0));
       for (int i = 50; i < 80; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        times[i - 50] = i;
+      }
+      results = reader.getValuesInTimestamps(times, 30);
+      for (int i = 50; i < 80; i++) {
+        assertEquals(i * 1.0, results[i - 50]);
       }
       assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
 
       // a bad client, change to another node again
       failedNodes.add(TestUtils.getNode(1));
       for (int i = 80; i < 90; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        times[i - 80] = i;
+      }
+      results = reader.getValuesInTimestamps(times, 10);
+      for (int i = 80; i < 90; i++) {
+        assertEquals(i * 1.0, results[i - 80]);
       }
       assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
 
@@ -211,7 +230,8 @@ public class RemoteSeriesReaderByTimestampTest {
       failedNodes.add(TestUtils.getNode(2));
 
       try {
-        reader.getValueInTimestamp(90);
+        times[0] = 90;
+        reader.getValuesInTimestamps(times, 1);
         fail();
       } catch (IOException e) {
         assertEquals("no available client.", e.getMessage());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 84d069a..1a52082 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -803,11 +803,15 @@ public class DataGroupMemberTest extends MemberTest {
     AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
 
+    List<Long> timestamps = new ArrayList<>(5);
     for (int i = 5; i < 10; i++) {
-      new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
-      Object value = SerializeUtils.deserializeObject(dataResult.get());
-      assertEquals(i * 1.0, (Double) value, 0.00001);
+      timestamps.add((long) i);
+    }
+    new DataAsyncService(dataGroupMember)
+        .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+    Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+    for (int i = 5; i < 10; i++) {
+      assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
     }
 
     new DataAsyncService(dataGroupMember)
@@ -861,11 +865,15 @@ public class DataGroupMemberTest extends MemberTest {
 
     AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
+    List<Long> timestamps = new ArrayList<>(4);
     for (int i = 5; i < 9; i++) {
-      new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
-      Object value = SerializeUtils.deserializeObject(dataResult.get());
-      assertEquals(i * 1.0, (Double) value, 0.00001);
+      timestamps.add((long) i);
+    }
+    new DataAsyncService(dataGroupMember)
+        .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+    Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+    for (int i = 5; i < 9; i++) {
+      assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
     }
 
     new DataAsyncService(dataGroupMember)
@@ -896,11 +904,13 @@ public class DataGroupMemberTest extends MemberTest {
   public void testFetchWithoutQuery() {
     System.out.println("Start testFetchWithoutQuery()");
     AtomicReference<Exception> result = new AtomicReference<>();
+    List<Long> timestamps = new ArrayList<>(1);
+    timestamps.add((long) 0);
     new DataAsyncService(dataGroupMember)
-        .fetchSingleSeriesByTimestamp(
+        .fetchSingleSeriesByTimestamps(
             TestUtils.getNode(0),
             0,
-            0,
+            timestamps,
             new AsyncMethodCallback<ByteBuffer>() {
               @Override
               public void onComplete(ByteBuffer buffer) {}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 08914d1..e4e6b25 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -881,6 +881,10 @@ public class MetaGroupMemberTest extends MemberTest {
 
     try {
       ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
+      long[] times = new long[10];
+      for (int i = 0; i < 10; i++) {
+        times[i] = i;
+      }
       for (int i = 0; i < 10; i++) {
         IReaderByTimestamp readerByTimestamp =
             readerFactory.getReaderByTimestamp(
@@ -889,8 +893,10 @@ public class MetaGroupMemberTest extends MemberTest {
                 TSDataType.DOUBLE,
                 context,
                 true);
+
+        Object[] values = readerByTimestamp.getValuesInTimestamps(times, 10);
         for (int j = 0; j < 10; j++) {
-          assertEquals(j * 1.0, (double) readerByTimestamp.getValueInTimestamp(j), 0.00001);
+          assertEquals(j * 1.0, (double) values[j], 0.00001);
         }
       }
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7b94a93..8016be7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -35,6 +35,7 @@ import java.nio.ByteBuffer;
 
 public abstract class AggregateResult {
 
+  public static final int TIME_LENGTH_FOR_FIRST_VALUE = 100;
   private final AggregationType aggregationType;
   protected TSDataType resultDataType;
 
@@ -94,6 +95,9 @@ public abstract class AggregateResult {
   public abstract void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
 
+  /** This method calculates the aggregation using values that have been calculated */
+  public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+
   /**
    * Judge if aggregation results have been calculated. In other words, if the aggregated result
    * does not need to compute the remaining data, it returns true.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 237fa08..23842cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -102,10 +102,19 @@ public class AvgAggrResult extends AggregateResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        updateAvg(seriesDataType, value);
+      if (values[i] != null) {
+        updateAvg(seriesDataType, values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        updateAvg(seriesDataType, values[i]);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index adf0069..7b40fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -74,9 +74,23 @@ public class CountAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
     int cnt = 0;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
+        cnt++;
+      }
+    }
+
+    long preValue = getLongValue();
+    preValue += cnt;
+    setLongValue(preValue);
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    int cnt = 0;
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
         cnt++;
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 0f51dc3..56ce8c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -94,11 +94,31 @@ public class FirstValueAggrResult extends AggregateResult {
     if (hasFinalResult()) {
       return;
     }
+    int currentPos = 0;
+    long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+    while (currentPos < length) {
+      int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+      System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+      Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+      for (int i = 0; i < timeLength; i++) {
+        if (values[i] != null) {
+          setValue(values[i]);
+          timestamp = timesForFirstValue[i];
+          return;
+        }
+      }
+      currentPos += timeLength;
+    }
+  }
 
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        setValue(value);
+      if (values[i] != null) {
+        setValue(values[i]);
         timestamp = timestamps[i];
         break;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index daf5a56..8eae923 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -53,11 +53,23 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
-    for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        setValue(value);
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        setValue(values[i]);
         timestamp = timestamps[i];
+        return;
+      }
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        setValue(values[i]);
+        timestamp = timestamps[i];
+        return;
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 0726d7a..ad06ace 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -85,18 +85,24 @@ public class LastValueAggrResult extends AggregateResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
-    long time = Long.MIN_VALUE;
-    Object lastVal = null;
-    for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        time = timestamps[i];
-        lastVal = value;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        timestamp = timestamps[i];
+        setValue(values[i]);
+        return;
       }
     }
-    if (time != Long.MIN_VALUE) {
-      setValue(lastVal);
-      timestamp = time;
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        timestamp = timestamps[i];
+        setValue(values[i]);
+        return;
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 15af763..587c7a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -68,19 +68,34 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
     if (hasFinalResult()) {
       return;
     }
-    long time = Long.MIN_VALUE;
-    Object lastVal = null;
-    for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        time = timestamps[i];
-        lastVal = value;
-        break;
+    int currentPos = 0;
+    long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+    while (currentPos < length) {
+      int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+      System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+      Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+      for (int i = 0; i < timeLength; i++) {
+        if (values[i] != null) {
+          setValue(values[i]);
+          timestamp = timesForFirstValue[i];
+          return;
+        }
       }
+      currentPos += timeLength;
     }
-    if (time != Long.MIN_VALUE) {
-      setValue(lastVal);
-      timestamp = time;
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        timestamp = timestamps[i];
+        setValue(values[i]);
+        return;
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 548b249..fad90de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -66,16 +66,22 @@ public class MaxTimeAggrResult extends AggregateResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
-    long time = -1;
-    for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        time = timestamps[i];
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        updateMaxTimeResult(timestamps[i]);
+        return;
       }
     }
+  }
 
-    if (time != -1) {
-      updateMaxTimeResult(time);
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        updateMaxTimeResult(timestamps[i]);
+        return;
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e29a211..a69dcc9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -52,17 +52,32 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
     if (hasFinalResult()) {
       return;
     }
-    long time = -1;
-    for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        time = timestamps[i];
-        break;
+    int currentPos = 0;
+    long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+    while (currentPos < length) {
+      int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+      System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+      Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+      for (int i = 0; i < timeLength; i++) {
+        if (values[i] != null) {
+          updateMaxTimeResult(timesForFirstValue[i]);
+          return;
+        }
       }
+      currentPos += timeLength;
     }
+  }
 
-    if (time != -1) {
-      updateMaxTimeResult(time);
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        updateMaxTimeResult(timestamps[i]);
+        return;
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index cd00df2..34d9622 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -72,13 +72,21 @@ public class MaxValueAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
     Comparable<Object> maxVal = null;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value == null) {
-        continue;
+      if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
+        maxVal = (Comparable<Object>) values[i];
       }
-      if (maxVal == null || maxVal.compareTo(value) < 0) {
-        maxVal = (Comparable<Object>) value;
+    }
+    updateResult(maxVal);
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    Comparable<Object> maxVal = null;
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
+        maxVal = (Comparable<Object>) values[i];
       }
     }
     updateResult(maxVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index a0fbabd..37883d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -74,9 +74,29 @@ public class MinTimeAggrResult extends AggregateResult {
     if (hasFinalResult()) {
       return;
     }
+    int currentPos = 0;
+    long[] timesForFirstValue = new long[TIME_LENGTH_FOR_FIRST_VALUE];
+    while (currentPos < length) {
+      int timeLength = Math.min(length - currentPos, TIME_LENGTH_FOR_FIRST_VALUE);
+      System.arraycopy(timestamps, currentPos, timesForFirstValue, 0, timeLength);
+      Object[] values = dataReader.getValuesInTimestamps(timesForFirstValue, timeLength);
+      for (int i = 0; i < timeLength; i++) {
+        if (values[i] != null) {
+          setLongValue(timesForFirstValue[i]);
+          return;
+        }
+      }
+      currentPos += timeLength;
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         setLongValue(timestamps[i]);
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 1bafc56..6b45f14 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -43,10 +43,21 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
-    for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
         setLongValue(timestamps[i]);
+        return;
+      }
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        setLongValue(timestamps[i]);
+        return;
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 8b17d75..eefbbb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -67,13 +67,21 @@ public class MinValueAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
     Comparable<Object> minVal = null;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value == null) {
-        continue;
+      if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
+        minVal = (Comparable<Object>) values[i];
       }
-      if (minVal == null || minVal.compareTo(value) > 0) {
-        minVal = (Comparable<Object>) value;
+    }
+    updateResult(minVal);
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    Comparable<Object> minVal = null;
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
+        minVal = (Comparable<Object>) values[i];
       }
     }
     updateResult(minVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index 475e953..8a11502 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -80,10 +80,19 @@ public class SumAggrResult extends AggregateResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        updateSum(value);
+      if (values[i] != null) {
+        updateSum(values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        updateSum(values[i]);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index bf48f10..0a02f94 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -35,9 +35,10 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
   private final List<IReaderByTimestamp> seriesReaderByTimestampList;
   private final List<Boolean> cached;
 
-  private boolean hasCachedRow;
-  private RowRecord cachedRowRecord;
-  private Object[] cachedRowInObjects;
+  private List<RowRecord> cachedRowRecords = new ArrayList<>();
+
+  /** Used for UDF. */
+  private List<Object[]> cachedRowInObjects = new ArrayList<>();
 
   /**
    * constructor of EngineDataSetWithValueFilter.
@@ -64,61 +65,85 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
 
   @Override
   public boolean hasNextWithoutConstraint() throws IOException {
-    if (hasCachedRow) {
+    if (!cachedRowRecords.isEmpty()) {
       return true;
     }
-    return cacheRowRecord();
+    return cacheRowRecords();
   }
 
+  /** @return the first record of cached rows or null if there is no more data */
   @Override
   public RowRecord nextWithoutConstraint() throws IOException {
-    if (!hasCachedRow && !cacheRowRecord()) {
+    if (cachedRowRecords.isEmpty() && !cacheRowRecords()) {
       return null;
     }
-    hasCachedRow = false;
-    return cachedRowRecord;
+    return cachedRowRecords.remove(cachedRowRecords.size() - 1);
   }
 
   /**
-   * Cache row record
+   * Cache row records
    *
    * @return if there has next row record.
    */
-  private boolean cacheRowRecord() throws IOException {
-    while (timeGenerator.hasNext()) {
-      boolean hasField = false;
-      long timestamp = timeGenerator.next();
-      RowRecord rowRecord = new RowRecord(timestamp);
-
-      for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
-        Object value;
-        // get value from readers in time generator
-        if (cached.get(i)) {
-          value = timeGenerator.getValue(paths.get(i), timestamp);
-        } else {
-          // get value from series reader without filter
-          IReaderByTimestamp reader = seriesReaderByTimestampList.get(i);
-          value = reader.getValueInTimestamp(timestamp);
-        }
-        if (value == null) {
-          rowRecord.addField(null);
+  private boolean cacheRowRecords() throws IOException {
+    int cachedTimeCnt = 0;
+    long[] cachedTimeArray = new long[fetchSize];
+    // TODO: LIMIT constraint
+    // 1. fill time array from time Generator
+    while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
+      cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+    }
+    if (cachedTimeCnt == 0) {
+      return false;
+    }
+    RowRecord[] rowRecords = new RowRecord[cachedTimeCnt];
+    for (int i = 0; i < cachedTimeCnt; i++) {
+      rowRecords[i] = new RowRecord(cachedTimeArray[i]);
+    }
+
+    boolean[] hasField = new boolean[cachedTimeCnt];
+    // 2. fetch results of each time series using time array
+    for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+      Object[] results;
+      // get value from readers in time generator
+      if (cached.get(i)) {
+        results = timeGenerator.getValues(paths.get(i));
+      } else {
+        results =
+            seriesReaderByTimestampList
+                .get(i)
+                .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
+      }
+
+      // 3. use values in results to fill row record
+      for (int j = 0; j < cachedTimeCnt; j++) {
+        if (results[j] == null) {
+          rowRecords[j].addField(null);
         } else {
-          hasField = true;
-          rowRecord.addField(value, dataTypes.get(i));
+          hasField[j] = true;
+          rowRecords[j].addField(results[j], dataTypes.get(i));
         }
       }
-      if (hasField) {
-        hasCachedRow = true;
-        cachedRowRecord = rowRecord;
-        break;
+    }
+    // 4. remove rowRecord if all values in one timestamp are null
+    // traverse in reversed order to get element efficiently
+    for (int i = cachedTimeCnt - 1; i >= 0; i--) {
+      if (hasField[i]) {
+        cachedRowRecords.add(rowRecords[i]);
       }
     }
-    return hasCachedRow;
+
+    // 5. check whether there is next row record
+    if (cachedRowRecords.isEmpty() && timeGenerator.hasNext()) {
+      // Note: This may leads to a deep stack if much rowRecords are empty
+      return cacheRowRecords();
+    }
+    return !cachedRowRecords.isEmpty();
   }
 
   @Override
   public boolean hasNextRowInObjects() throws IOException {
-    if (hasCachedRow) {
+    if (!cachedRowInObjects.isEmpty()) {
       return true;
     }
     return cacheRowInObjects();
@@ -126,40 +151,67 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
 
   @Override
   public Object[] nextRowInObjects() throws IOException {
-    if (!hasCachedRow && !cacheRowInObjects()) {
+    if (cachedRowInObjects.isEmpty() && !cacheRowInObjects()) {
       // values + timestamp
       return new Object[seriesReaderByTimestampList.size() + 1];
     }
-    hasCachedRow = false;
-    return cachedRowInObjects;
+
+    return cachedRowInObjects.remove(cachedRowInObjects.size() - 1);
   }
 
   private boolean cacheRowInObjects() throws IOException {
-    int seriesNumber = seriesReaderByTimestampList.size();
-    while (timeGenerator.hasNext()) {
-      boolean hasField = false;
-
-      Object[] rowInObjects = new Object[seriesNumber + 1];
-      long timestamp = timeGenerator.next();
-      rowInObjects[seriesNumber] = timestamp;
-
-      for (int i = 0; i < seriesNumber; i++) {
-        Object value =
-            cached.get(i)
-                ? timeGenerator.getValue(paths.get(i), timestamp)
-                : seriesReaderByTimestampList.get(i).getValueInTimestamp(timestamp);
-        if (value != null) {
-          hasField = true;
-          rowInObjects[i] = value;
-        }
+    int cachedTimeCnt = 0;
+    long[] cachedTimeArray = new long[fetchSize];
+
+    // TODO: LIMIT constraint
+    // 1. fill time array from time Generator
+    while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
+      cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+    }
+    if (cachedTimeCnt == 0) {
+      return false;
+    }
+
+    Object[][] rowsInObject = new Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1];
+    for (int i = 0; i < cachedTimeCnt; i++) {
+      rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i];
+    }
+
+    boolean[] hasField = new boolean[cachedTimeCnt];
+    // 2. fetch results of each time series using time array
+    for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+      Object[] results;
+      // get value from readers in time generator
+      if (cached.get(i)) {
+        results = timeGenerator.getValues(paths.get(i));
+      } else {
+        results =
+            seriesReaderByTimestampList
+                .get(i)
+                .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
       }
 
-      if (hasField) {
-        hasCachedRow = true;
-        cachedRowInObjects = rowInObjects;
-        break;
+      // 3. use values in results to fill row record
+      for (int j = 0; j < cachedTimeCnt; j++) {
+        if (results[j] != null) {
+          hasField[j] = true;
+          rowsInObject[j][i] = results[j];
+        }
+      }
+    }
+    // 4. remove rowRecord if all values in one timestamp are null
+    // traverse in reversed order to get element efficiently
+    for (int i = cachedTimeCnt - 1; i >= 0; i--) {
+      if (hasField[i]) {
+        cachedRowInObjects.add(rowsInObject[i]);
       }
     }
-    return hasCachedRow;
+
+    // 5. check whether there is next row record
+    if (cachedRowInObjects.isEmpty() && timeGenerator.hasNext()) {
+      // Note: This may leads to a deep stack if much rowRecords are empty
+      return cacheRowInObjects();
+    }
+    return !cachedRowInObjects.isEmpty();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 4c54ac9..2416f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -58,8 +59,11 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
+import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
+
 @SuppressWarnings("java:S1135") // ignore todos
 public class AggregationExecutor {
 
@@ -336,27 +340,23 @@ public class AggregationExecutor {
    */
   public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
       throws StorageEngineException, IOException, QueryProcessException {
-    int index = 0;
-    for (; index < aggregations.size(); index++) {
-      String aggregationFunc = aggregations.get(index);
-      if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
-          && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
-        break;
-      }
-    }
-    if (index >= aggregations.size()) {
-      queryPlan.setAscending(false);
-      this.ascending = false;
-    }
+    optimizeLastElementFunc(queryPlan);
+
     TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
-    List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
+    // group by path name
+    Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
+        groupAggregationsBySeries(selectedSeries);
+    Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
     List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
     try {
       for (int i = 0; i < selectedSeries.size(); i++) {
         PartialPath path = selectedSeries.get(i);
-        IReaderByTimestamp seriesReaderByTimestamp =
-            getReaderByTime(path, queryPlan, dataTypes.get(i), context);
-        readersOfSelectedSeries.add(seriesReaderByTimestamp);
+        List<Integer> indexes = pathToAggrIndexesMap.remove(path);
+        if (indexes != null) {
+          IReaderByTimestamp seriesReaderByTimestamp =
+              getReaderByTime(path, queryPlan, dataTypes.get(i), context);
+          readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
+        }
       }
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
@@ -364,15 +364,30 @@ public class AggregationExecutor {
 
     List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
-      TSDataType type = dataTypes.get(i);
       AggregateResult result =
-          AggregateResultFactory.getAggrResultByName(aggregations.get(i), type, ascending);
+          AggregateResultFactory.getAggrResultByName(
+              aggregations.get(i), dataTypes.get(i), ascending);
       aggregateResults.add(result);
     }
-    aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
+    aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
     return constructDataSet(aggregateResults, queryPlan);
   }
 
+  private void optimizeLastElementFunc(QueryPlan queryPlan) {
+    int index = 0;
+    for (; index < aggregations.size(); index++) {
+      String aggregationFunc = aggregations.get(index);
+      if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
+          && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
+        break;
+      }
+    }
+    if (index >= aggregations.size()) {
+      queryPlan.setAscending(false);
+      this.ascending = false;
+    }
+  }
+
   protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
       throws StorageEngineException {
     return new ServerTimeGenerator(expression, context, queryPlan);
@@ -395,8 +410,11 @@ public class AggregationExecutor {
   private void aggregateWithValueFilter(
       List<AggregateResult> aggregateResults,
       TimeGenerator timestampGenerator,
-      List<IReaderByTimestamp> readersOfSelectedSeries)
+      Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
       throws IOException {
+    List<Boolean> cached =
+        markFilterdPaths(
+            expression, new ArrayList<>(selectedSeries), timestampGenerator.hasOrNode());
 
     while (timestampGenerator.hasNext()) {
 
@@ -411,11 +429,26 @@ public class AggregationExecutor {
       }
 
       // cal part of aggregate result
-      for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
-        aggregateResults
-            .get(i)
-            .updateResultUsingTimestamps(
-                timeArray, timeArrayLength, readersOfSelectedSeries.get(i));
+      for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
+        int pathId = entry.getValue().get(0);
+        // cache in timeGenerator
+        if (cached.get(pathId)) {
+          Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+          for (Integer i : entry.getValue()) {
+            aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+          }
+        } else {
+          if (entry.getValue().size() == 1) {
+            aggregateResults
+                .get(entry.getValue().get(0))
+                .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+          } else {
+            Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+            for (Integer i : entry.getValue()) {
+              aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+            }
+          }
+        }
       }
     }
   }
@@ -474,9 +507,7 @@ public class AggregationExecutor {
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap = new HashMap<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       PartialPath series = selectedSeries.get(i);
-      List<Integer> indexList =
-          pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>());
-      indexList.add(i);
+      pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>()).add(i);
     }
     return pathToAggrIndexesMap;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
index ba482c4..ffb8f62 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
@@ -32,34 +32,42 @@ public class ByTimestampReaderAdapter implements IReaderByTimestamp {
   // only cache the first point that >= timestamp
   private boolean hasCached;
   private TimeValuePair pair;
+  private long currentTime = Long.MIN_VALUE;
 
   public ByTimestampReaderAdapter(IPointReader pointReader) {
     this.pointReader = pointReader;
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
-    if (hasCached) {
-      if (pair.getTimestamp() < timestamp) {
-        hasCached = false;
-      } else if (pair.getTimestamp() == timestamp) {
-        hasCached = false;
-        return pair.getValue().getValue();
-      } else {
-        return null;
-      }
-    }
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
+    Object[] result = new Object[length];
 
-    while (pointReader.hasNextTimeValuePair()) {
-      pair = pointReader.nextTimeValuePair();
-      if (pair.getTimestamp() == timestamp) {
-        return pair.getValue().getValue();
-      } else if (pair.getTimestamp() > timestamp) {
-        hasCached = true;
-        return null;
+    for (int i = 0; i < length; i++) {
+      if (timestamps[i] < currentTime) {
+        throw new IOException("time must be increasing when use ReaderByTimestamp");
+      }
+      currentTime = timestamps[i];
+      // search cache
+      if (hasCached && pair.getTimestamp() >= currentTime) {
+        if (pair.getTimestamp() == currentTime) {
+          hasCached = false;
+          result[i] = pair.getValue().getValue();
+        }
+        continue;
+      }
+      // search reader
+      while (pointReader.hasNextTimeValuePair()) {
+        pair = pointReader.nextTimeValuePair();
+        if (pair.getTimestamp() == currentTime) {
+          result[i] = pair.getValue().getValue();
+          break;
+        } else if (pair.getTimestamp() > currentTime) {
+          hasCached = true;
+          result[i] = null;
+          break;
+        }
       }
     }
-
-    return null;
+    return result;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
index bc87cf7..0d24c06 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
@@ -34,46 +34,49 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
 
   private ChunkReaderByTimestamp chunkReaderByTimestamp;
   private BatchData data;
+  private long currentTime = Long.MIN_VALUE;
 
   public DiskChunkReaderByTimestamp(ChunkReaderByTimestamp chunkReaderByTimestamp) {
     this.chunkReaderByTimestamp = chunkReaderByTimestamp;
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
+    Object[] result = new Object[length];
 
-    if (!hasNext()) {
-      return null;
-    }
-
-    while (data != null) {
-      Object value = data.getValueInTimestamp(timestamp);
-      if (value != null) {
-        return value;
+    for (int i = 0; i < length; i++) {
+      if (timestamps[i] < currentTime) {
+        throw new IOException("time must be increasing when use ReaderByTimestamp");
       }
-      if (data.hasCurrent()) {
-        return null;
-      } else {
-        chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
-        if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
-          data = chunkReaderByTimestamp.nextPageData();
-        } else {
-          return null;
+      currentTime = timestamps[i];
+      while (hasNext()) {
+        data = next();
+        if (data.getMinTimestamp() > currentTime) {
+          result[i] = null;
+          break;
+        }
+        result[i] = data.getValueInTimestamp(currentTime);
+        // fill cache
+        if (!data.hasCurrent() && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+          data = next();
         }
       }
     }
+    return result;
+  }
 
-    return null;
+  private boolean hasCacheData() {
+    return data != null && data.hasCurrent();
   }
 
-  private boolean hasNext() throws IOException {
-    if (data != null && data.hasCurrent()) {
-      return true;
-    }
-    if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
-      data = chunkReaderByTimestamp.nextPageData();
-      return true;
+  private boolean hasNext() {
+    return hasCacheData() || chunkReaderByTimestamp.hasNextSatisfiedPage();
+  }
+
+  private BatchData next() throws IOException {
+    if (hasCacheData()) {
+      return data;
     }
-    return false;
+    return chunkReaderByTimestamp.nextPageData();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
index 57d4813..0db7bab 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
@@ -34,7 +34,7 @@ public interface IReaderByTimestamp {
    * guarantee of correctness with any other way of calling. For example, DO NOT call this method
    * twice with the same timestamp.
    */
-  Object getValueInTimestamp(long timestamp) throws IOException;
+  Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException;
 
   /**
    * Returns whether there is no more data in reader.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index f09b980..69d3248 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -67,13 +67,22 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
-    seriesReader.setTimeFilter(timestamp);
-    if ((batchData == null || !hasAvailableData(batchData, timestamp)) && !hasNext(timestamp)) {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
+    if (length <= 0) {
       return null;
     }
+    Object[] results = new Object[length];
+    seriesReader.setTimeFilter(timestamps[0]);
+    for (int i = 0; i < length; i++) {
+      if ((batchData == null || !hasAvailableData(batchData, timestamps[i]))
+          && !hasNext(timestamps[i])) {
+        // there is no more data
+        break;
+      }
+      results[i] = batchData.getValueInTimestamp(timestamps[i]);
+    }
 
-    return batchData.getValueInTimestamp(timestamp);
+    return results;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4e99e90..6aac0c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -645,7 +645,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp = getQueryColumnHeaders(plan, username);
       }
       // create and cache dataset
-      QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
+      QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
       if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
         resp = getListDataSetHeaders(newDataSet);
       } else if (plan instanceof UDFPlan) {
@@ -1022,12 +1022,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   /** create QueryDataSet and buffer it for fetchResults */
-  private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan)
+  private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan, int fetchSize)
       throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
           IOException, MetadataException, SQLException, TException, InterruptedException {
 
     QueryContext context = genQueryContext(queryId);
     QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
+    queryDataSet.setFetchSize(fetchSize);
     queryId2DataSet.put(queryId, queryDataSet);
     return queryDataSet;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 4e71afa..d78d6d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -80,14 +80,18 @@ public class SeriesReaderByTimestampTest {
             null,
             true);
 
+    long timestamps[] = new long[500];
+    for (int i = 0; i < 500; i++) {
+      timestamps[i] = i;
+    }
+    Object[] values = seriesReader.getValuesInTimestamps(timestamps, timestamps.length);
     for (int time = 0; time < 500; time++) {
-      Integer value = (Integer) seriesReader.getValueInTimestamp(time);
       if (time < 200) {
-        Assert.assertEquals(time + 20000, value.intValue());
+        Assert.assertEquals(time + 20000, values[time]);
       } else if (time < 260 || (time >= 300 && time < 380) || (time >= 400)) {
-        Assert.assertEquals(time + 10000, value.intValue());
+        Assert.assertEquals(time + 10000, values[time]);
       } else {
-        Assert.assertEquals(time, value.intValue());
+        Assert.assertEquals(time, values[time]);
       }
     }
   }
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 7c0e506..a8d3baf 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -344,12 +344,12 @@ service TSDataService extends RaftService {
   long querySingleSeriesByTimestamp(1:SingleSeriesQueryRequest request)
 
    /**
-   * Fetch one value at given timestamp using the resultSetId generated by
+   * Fetch values at given timestamps using the resultSetId generated by
    * querySingleSeriesByTimestamp.
    * @return a ByteBuffer containing the serialized value or an empty buffer if there
    * are not more results.
    **/
-   binary fetchSingleSeriesByTimestamp(1:Node header, 2:long readerId, 3:long timestamp)
+   binary fetchSingleSeriesByTimestamps(1:Node header, 2:long readerId, 3:list<long> timestamps)
 
   /**
   * Find the local query established for the remote query and release all its resource.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
index 8985bcc..7d911c4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
@@ -72,7 +72,7 @@ public class DataSetWithTimeGenerator extends QueryDataSet {
 
       // get value from readers in time generator
       if (cached.get(i)) {
-        Object value = timeGenerator.getValue(paths.get(i), timestamp);
+        Object value = timeGenerator.getValue(paths.get(i));
         rowRecord.addField(value, dataTypes.get(i));
         continue;
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index eb7a206..c68a0e0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -33,6 +33,7 @@ public abstract class QueryDataSet {
   protected int rowLimit = 0; // rowLimit > 0 means the LIMIT constraint exists
   protected int rowOffset = 0;
   protected int alreadyReturnedRowNum = 0;
+  protected int fetchSize = 10000;
   protected boolean ascending;
 
   public QueryDataSet() {}
@@ -81,6 +82,10 @@ public abstract class QueryDataSet {
     return nextWithoutConstraint();
   }
 
+  public void setFetchSize(int fetchSize) {
+    this.fetchSize = fetchSize;
+  }
+
   public abstract RowRecord nextWithoutConstraint() throws IOException;
 
   public List<Path> getPaths() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
index 78c0686..0da8b94 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
@@ -31,7 +31,9 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 
 /**
  * All SingleSeriesExpression involved in a IExpression will be transferred to a TimeGenerator tree
@@ -41,7 +43,8 @@ import java.util.*;
  */
 public abstract class TimeGenerator {
 
-  private HashMap<Path, List<LeafNode>> leafCache = new HashMap<>();
+  private HashMap<Path, List<LeafNode>> leafNodeCache = new HashMap<>();
+  private HashMap<Path, List<Object>> leafValuesCache;
   protected Node operatorNode;
   private boolean hasOrNode;
 
@@ -50,18 +53,43 @@ public abstract class TimeGenerator {
   }
 
   public long next() throws IOException {
+    if (!hasOrNode) {
+      if (leafValuesCache == null) {
+        leafValuesCache = new HashMap<>();
+      }
+      leafNodeCache.forEach(
+          (path, nodes) ->
+              leafValuesCache
+                  .computeIfAbsent(path, k -> new ArrayList<>())
+                  .add(nodes.get(0).currentValue()));
+    }
     return operatorNode.next();
   }
 
-  public Object getValue(Path path, long time) {
-    for (LeafNode leafNode : leafCache.get(path)) {
-      if (!leafNode.currentTimeIs(time)) {
-        continue;
-      }
-      return leafNode.currentValue();
+  /** ATTENTION: this method should only be used when there is no `OR` node */
+  public Object[] getValues(Path path) throws IOException {
+    if (hasOrNode) {
+      throw new IOException(
+          "getValues() method should not be invoked when there is OR operator in where clause");
     }
+    if (leafValuesCache.get(path) == null) {
+      throw new IOException(
+          "getValues() method should not be invoked by non-existent path in where clause");
+    }
+    return leafValuesCache.remove(path).toArray();
+  }
 
-    return null;
+  /** ATTENTION: this method should only be used when there is no `OR` node */
+  public Object getValue(Path path) throws IOException {
+    if (hasOrNode) {
+      throw new IOException(
+          "getValue() method should not be invoked when there is OR operator in where clause");
+    }
+    if (leafValuesCache.get(path) == null) {
+      throw new IOException(
+          "getValue() method should not be invoked by non-existent path in where clause");
+    }
+    return leafValuesCache.get(path).remove(0);
   }
 
   public void constructNode(IExpression expression) throws IOException {
@@ -76,13 +104,9 @@ public abstract class TimeGenerator {
       IBatchReader seriesReader = generateNewBatchReader(singleSeriesExp);
       Path path = singleSeriesExp.getSeriesPath();
 
-      if (!leafCache.containsKey(path)) {
-        leafCache.put(path, new ArrayList<>());
-      }
-
       // put the current reader to valueCache
       LeafNode leafNode = new LeafNode(seriesReader);
-      leafCache.get(path).add(leafNode);
+      leafNodeCache.computeIfAbsent(path, p -> new ArrayList<>()).add(leafNode);
 
       return leafNode;
     } else {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
index b90aafb..eff83b0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
@@ -26,8 +26,8 @@ public class AndNode implements Node {
   private Node leftChild;
   private Node rightChild;
 
-  private long cachedValue;
-  private boolean hasCachedValue;
+  private long cachedTime;
+  private boolean hasCachedTime;
   private boolean ascending = true;
 
   /**
@@ -39,20 +39,20 @@ public class AndNode implements Node {
   public AndNode(Node leftChild, Node rightChild) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedValue = false;
+    this.hasCachedTime = false;
   }
 
   public AndNode(Node leftChild, Node rightChild, boolean ascending) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedValue = false;
+    this.hasCachedTime = false;
     this.ascending = ascending;
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   @Override
   public boolean hasNext() throws IOException {
-    if (hasCachedValue) {
+    if (hasCachedTime) {
       return true;
     }
     if (leftChild.hasNext() && rightChild.hasNext()) {
@@ -69,8 +69,8 @@ public class AndNode implements Node {
     long rightValue = rightChild.next();
     while (true) {
       if (leftValue == rightValue) {
-        this.hasCachedValue = true;
-        this.cachedValue = leftValue;
+        this.hasCachedTime = true;
+        this.cachedTime = leftValue;
         return true;
       }
       if (seekRight.test(leftValue, rightValue)) {
@@ -92,8 +92,8 @@ public class AndNode implements Node {
   @Override
   public long next() throws IOException {
     if (hasNext()) {
-      hasCachedValue = false;
-      return cachedValue;
+      hasCachedTime = false;
+      return cachedTime;
     }
     throw new IOException("no more data");
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
index 6205030..a133cbb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
@@ -25,55 +25,55 @@ public class OrNode implements Node {
   private Node leftChild;
   private Node rightChild;
 
-  private boolean hasCachedLeftValue;
-  private long cachedLeftValue;
-  private boolean hasCachedRightValue;
-  private long cachedRightValue;
+  private boolean hasCachedLeftTime;
+  private long cachedLeftTime;
+  private boolean hasCachedRightTime;
+  private long cachedRightTime;
   private boolean ascending = true;
 
   public OrNode(Node leftChild, Node rightChild) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedLeftValue = false;
-    this.hasCachedRightValue = false;
+    this.hasCachedLeftTime = false;
+    this.hasCachedRightTime = false;
   }
 
   public OrNode(Node leftChild, Node rightChild, boolean ascending) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedLeftValue = false;
-    this.hasCachedRightValue = false;
+    this.hasCachedLeftTime = false;
+    this.hasCachedRightTime = false;
     this.ascending = ascending;
   }
 
   @Override
   public boolean hasNext() throws IOException {
-    if (hasCachedLeftValue || hasCachedRightValue) {
+    if (hasCachedLeftTime || hasCachedRightTime) {
       return true;
     }
     return leftChild.hasNext() || rightChild.hasNext();
   }
 
   private boolean hasLeftValue() throws IOException {
-    return hasCachedLeftValue || leftChild.hasNext();
+    return hasCachedLeftTime || leftChild.hasNext();
   }
 
   private long getLeftValue() throws IOException {
-    if (hasCachedLeftValue) {
-      hasCachedLeftValue = false;
-      return cachedLeftValue;
+    if (hasCachedLeftTime) {
+      hasCachedLeftTime = false;
+      return cachedLeftTime;
     }
     return leftChild.next();
   }
 
   private boolean hasRightValue() throws IOException {
-    return hasCachedRightValue || rightChild.hasNext();
+    return hasCachedRightTime || rightChild.hasNext();
   }
 
   private long getRightValue() throws IOException {
-    if (hasCachedRightValue) {
-      hasCachedRightValue = false;
-      return cachedRightValue;
+    if (hasCachedRightTime) {
+      hasCachedRightTime = false;
+      return cachedRightTime;
     }
     return rightChild.next();
   }
@@ -99,12 +99,12 @@ public class OrNode implements Node {
 
   private long popAndFillNextCache(boolean popLeft, boolean popRight, long left, long right) {
     if (popLeft) {
-      hasCachedRightValue = true;
-      cachedRightValue = right;
+      hasCachedRightTime = true;
+      cachedRightTime = right;
       return left;
     } else if (popRight) {
-      hasCachedLeftValue = true;
-      cachedLeftValue = left;
+      hasCachedLeftTime = true;
+      cachedLeftTime = left;
       return right;
     } else {
       return left;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
index 9b848dc..85b30c0 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
@@ -35,19 +35,18 @@ public class FakedTimeGenerator extends TimeGenerator {
 
   public FakedTimeGenerator() throws IOException {
 
-    // or(and(d1.s1, d2.s2), d2.s2)
+    // and(and(d1.s1, d2.s2), d2.s2)
     IExpression expression =
-        BinaryExpression.or(
+        BinaryExpression.and(
             BinaryExpression.and(
                 new SingleSeriesExpression(
                     new Path("d1", "s1"),
-                    FilterFactory.and(TimeFilter.gtEq(1L), TimeFilter.ltEq(5L))),
+                    FilterFactory.and(TimeFilter.gtEq(3L), TimeFilter.ltEq(8L))),
                 new SingleSeriesExpression(
                     new Path("d2", "s2"),
                     FilterFactory.and(TimeFilter.gtEq(1L), TimeFilter.ltEq(10L)))),
             new SingleSeriesExpression(
-                new Path("d2", "s2"),
-                FilterFactory.and(TimeFilter.gtEq(11L), TimeFilter.ltEq(15L))));
+                new Path("d2", "s2"), FilterFactory.and(TimeFilter.gtEq(2L), TimeFilter.ltEq(6L))));
 
     super.constructNode(expression);
   }
@@ -68,10 +67,10 @@ public class FakedTimeGenerator extends TimeGenerator {
     Path path = new Path("d1", "s1");
     long count = 0;
     while (fakedTimeGenerator.hasNext()) {
-      long time = fakedTimeGenerator.next();
-      fakedTimeGenerator.getValue(path, time);
+      fakedTimeGenerator.next();
+      fakedTimeGenerator.getValue(path);
       count++;
     }
-    Assert.assertEquals(10L, count);
+    Assert.assertEquals(4L, count);
   }
 }