You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/11 09:01:21 UTC

[iotdb] branch clusterQueryOpt updated: implement cluster version

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
     new 2d6773f  implement cluster version
2d6773f is described below

commit 2d6773f2f640948200e37b3ba92c7201580159a4
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 11 17:00:42 2021 +0800

    implement cluster version
---
 .../iotdb/cluster/query/LocalQueryExecutor.java    | 15 ++---
 .../iotdb/cluster/query/reader/EmptyReader.java    |  8 +--
 .../cluster/query/reader/MergedReaderByTime.java   |  4 +-
 .../reader/RemoteSeriesReaderByTimestamp.java      | 54 ++++++++---------
 .../iotdb/cluster/server/DataClusterServer.java    | 13 ++--
 .../cluster/server/service/DataAsyncService.java   | 10 +++-
 .../cluster/server/service/DataSyncService.java    |  5 +-
 .../iotdb/cluster/common/TestAsyncDataClient.java  |  9 ++-
 .../cluster/common/TestManagedSeriesReader.java    | 25 +++++---
 .../query/manage/ClusterQueryManagerTest.java      |  2 +-
 .../reader/RemoteSeriesReaderByTimestampTest.java  | 69 ++++++++++++++--------
 .../cluster/server/member/DataGroupMemberTest.java | 30 ++++++----
 .../cluster/server/member/MetaGroupMemberTest.java |  8 ++-
 .../reader/series/SeriesReaderByTimestamp.java     | 11 ++--
 thrift/src/main/thrift/cluster.thrift              |  4 +-
 15 files changed, 157 insertions(+), 110 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 028b3fc..24c7847 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -102,24 +102,19 @@ public class LocalQueryExecutor {
     return ((CMManager) IoTDB.metaManager);
   }
 
-  /**
-   * Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer".
-   *
-   * @param readerId
-   * @param time
-   */
-  public ByteBuffer fetchSingleSeriesByTimestamp(long readerId, long time)
+  /** Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer". */
+  public ByteBuffer fetchSingleSeriesByTimestamps(long readerId, long[] timestamps, int length)
       throws ReaderNotFoundException, IOException {
     IReaderByTimestamp reader = dataGroupMember.getQueryManager().getReaderByTimestamp(readerId);
     if (reader == null) {
       throw new ReaderNotFoundException(readerId);
     }
-    Object value = reader.getValueInTimestamp(time);
-    if (value != null) {
+    Object[] values = reader.getValuesInTimestamps(timestamps, length);
+    if (values != null) {
       ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
 
-      SerializeUtils.serializeObject(value, dataOutputStream);
+      SerializeUtils.serializeObjects(values, dataOutputStream);
       return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
     } else {
       return ByteBuffer.allocate(0);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index 8ead5bd..3b29a69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -155,12 +156,7 @@ public class EmptyReader extends BaseManagedSeriesReader
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) {
-    return null;
-  }
-
-  @Override
-  public Object[] getValuesInTimestamps(long[] timestamps) {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
     return null;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
index 2db6a09..c1906e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
@@ -33,10 +33,10 @@ public class MergedReaderByTime implements IReaderByTimestamp {
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
     for (IReaderByTimestamp innerReader : innerReaders) {
       if (innerReader != null) {
-        Object valueInTimestamp = innerReader.getValueInTimestamp(timestamp);
+        Object[] valueInTimestamp = innerReader.getValuesInTimestamps(timestamps, length);
         if (valueInTimestamp != null) {
           return valueInTimestamp;
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index f2d8729..d077f02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
@@ -49,37 +51,42 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
     if (!sourceInfo.checkCurClient()) {
       return null;
     }
 
     ByteBuffer result;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-      result = fetchResultAsync(timestamp);
+      result = fetchResultAsync(timestamps, length);
     } else {
-      result = fetchResultSync(timestamp);
+      result = fetchResultSync(timestamps, length);
     }
 
-    return SerializeUtils.deserializeObject(result);
+    return SerializeUtils.deserializeObjects(result);
   }
 
   @SuppressWarnings("java:S2274") // enable timeout
-  private ByteBuffer fetchResultAsync(long timestamp) throws IOException {
+  private ByteBuffer fetchResultAsync(long[] timestamps, int length) throws IOException {
+    // convert long[] to List<Long>, which is used for thrift
+    List<Long> timestampList = new ArrayList<>(length);
+    for (int i = 0; i < length; i++) {
+      timestampList.add(timestamps[i]);
+    }
     synchronized (fetchResult) {
       fetchResult.set(null);
       try {
         sourceInfo
             .getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
-            .fetchSingleSeriesByTimestamp(
-                sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp, handler);
+            .fetchSingleSeriesByTimestamps(
+                sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList, handler);
         fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
       } catch (TException e) {
         // try other node
-        if (!sourceInfo.switchNode(true, timestamp)) {
+        if (!sourceInfo.switchNode(true, timestamps[0])) {
           return null;
         }
-        return fetchResultAsync(timestamp);
+        return fetchResultAsync(timestamps, length);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.warn("Query {} interrupted", sourceInfo);
@@ -89,36 +96,27 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
     return fetchResult.get();
   }
 
-  private ByteBuffer fetchResultSync(long timestamp) throws IOException {
+  private ByteBuffer fetchResultSync(long[] timestamps, int length) throws IOException {
     SyncDataClient curSyncClient = null;
+    // convert long[] to List<Long>, which is used for thrift
+    List<Long> timestampList = new ArrayList<>(length);
+    for (int i = 0; i < length; i++) {
+      timestampList.add(timestamps[i]);
+    }
     try {
       curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
-      return curSyncClient.fetchSingleSeriesByTimestamp(
-          sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp);
+      return curSyncClient.fetchSingleSeriesByTimestamps(
+          sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList);
     } catch (TException e) {
       // try other node
-      if (!sourceInfo.switchNode(true, timestamp)) {
+      if (!sourceInfo.switchNode(true, timestamps[0])) {
         return null;
       }
-      return fetchResultSync(timestamp);
+      return fetchResultSync(timestamps, length);
     } finally {
       if (curSyncClient != null) {
         ClientUtils.putBackSyncClient(curSyncClient);
       }
     }
   }
-
-  @Override
-  public Object[] getValuesInTimestamps(long[] timestamps) throws IOException {
-    return null;
-  }
-
-  @SuppressWarnings("java:S2274") // enable timeout
-  private ByteBuffer fetchResultAsync(long[] timestamps) throws IOException {
-    return null;
-  }
-
-  private ByteBuffer fetchResultSync(long[] timestamps) throws IOException {
-    return null;
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 012569e..03a78a9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -384,12 +384,15 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(
-      Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+  public void fetchSingleSeriesByTimestamps(
+      Node header,
+      long readerId,
+      List<Long> timestamps,
+      AsyncMethodCallback<ByteBuffer> resultHandler) {
     DataAsyncService service =
         getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
     if (service != null) {
-      service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
+      service.fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler);
     }
   }
 
@@ -737,9 +740,9 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
       throws TException {
-    return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+    return getDataSyncService(header).fetchSingleSeriesByTimestamps(header, readerId, timestamps);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 41a2689..99fe858 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -207,13 +207,17 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(
-      Node header, long readerId, long timestamp, AsyncMethodCallback<ByteBuffer> resultHandler) {
+  public void fetchSingleSeriesByTimestamps(
+      Node header,
+      long readerId,
+      List<Long> timestamps,
+      AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
       resultHandler.onComplete(
           dataGroupMember
               .getLocalQueryExecutor()
-              .fetchSingleSeriesByTimestamp(readerId, timestamp));
+              .fetchSingleSeriesByTimestamps(
+                  readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size()));
     } catch (ReaderNotFoundException | IOException e) {
       resultHandler.onError(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 61ae338..7e39689 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -209,12 +209,13 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
       throws TException {
     try {
       return dataGroupMember
           .getLocalQueryExecutor()
-          .fetchSingleSeriesByTimestamp(readerId, timestamp);
+          .fetchSingleSeriesByTimestamps(
+              readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size());
     } catch (ReaderNotFoundException | IOException e) {
       throw new TException(e);
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 661dfbf..5b2797a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -99,12 +99,15 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(
-      Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
+  public void fetchSingleSeriesByTimestamps(
+      Node header,
+      long readerId,
+      List<Long> timestamps,
+      AsyncMethodCallback<ByteBuffer> resultHandler) {
     new Thread(
             () ->
                 new DataAsyncService(dataGroupMemberMap.get(header))
-                    .fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler))
+                    .fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler))
         .start();
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
index 5ac3f64..6f220e4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestManagedSeriesReader.java
@@ -57,17 +57,24 @@ public class TestManagedSeriesReader implements ManagedSeriesReader, IReaderByTi
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) {
-    while (batchData.hasCurrent()) {
-      long currTime = batchData.currentTime();
-      if (currTime == timestamp) {
-        return batchData.currentValue();
-      } else if (currTime > timestamp) {
-        break;
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) {
+    Object[] results = new Object[length];
+    boolean hasValue = false;
+    for (int i = 0; i < length; i++) {
+      while (batchData.hasCurrent()) {
+        long currTime = batchData.currentTime();
+        if (currTime == timestamps[i]) {
+          hasValue = true;
+          results[i] = batchData.currentValue();
+          break;
+        } else if (currTime > timestamps[i]) {
+          results[i] = null;
+          break;
+        }
+        batchData.next();
       }
-      batchData.next();
     }
-    return null;
+    return hasValue ? results : null;
   }
 
   @Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
index 01a7b8b..35a404e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManagerTest.java
@@ -79,7 +79,7 @@ public class ClusterQueryManagerTest {
 
   @Test
   public void testRegisterReaderByTime() {
-    IReaderByTimestamp reader = timestamp -> null;
+    IReaderByTimestamp reader = (timestamp, length) -> null;
     long id = queryManager.registerReaderByTime(reader);
     assertSame(reader, queryManager.getReaderByTimestamp(id));
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index a81dc66..b8a4706 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -45,11 +45,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 public class RemoteSeriesReaderByTimestampTest {
@@ -68,10 +68,10 @@ public class RemoteSeriesReaderByTimestampTest {
           public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
             return new AsyncDataClient(null, null, node, null) {
               @Override
-              public void fetchSingleSeriesByTimestamp(
+              public void fetchSingleSeriesByTimestamps(
                   Node header,
                   long readerId,
-                  long time,
+                  List<Long> timestamps,
                   AsyncMethodCallback<ByteBuffer> resultHandler)
                   throws TException {
                 if (failedNodes.contains(node)) {
@@ -84,23 +84,26 @@ public class RemoteSeriesReaderByTimestampTest {
                           DataOutputStream dataOutputStream =
                               new DataOutputStream(byteArrayOutputStream);
                           boolean isNull = true;
-                          while (batchData.hasCurrent()) {
-                            long currentTime = batchData.currentTime();
-                            Object value = batchData.currentValue();
-                            if (currentTime == time) {
-                              SerializeUtils.serializeObject(value, dataOutputStream);
+                          Object[] results = new Object[timestamps.size()];
+                          for (int i = 0; i < timestamps.size(); i++) {
+                            while (batchData.hasCurrent()) {
+                              long currentTime = batchData.currentTime();
+                              if (currentTime == timestamps.get(i)) {
+                                results[i] = batchData.currentValue();
+                                batchData.next();
+                                isNull = false;
+                                break;
+                              } else if (currentTime > timestamps.get(i)) {
+                                results[i] = null;
+                                break;
+                              }
+                              // time < timestamp, continue
                               batchData.next();
-                              isNull = false;
-                              break;
-                            } else if (currentTime > time) {
-                              break;
                             }
-                            // time < timestamp, continue
-                            batchData.next();
-                          }
-                          if (isNull) {
-                            SerializeUtils.serializeObject(null, dataOutputStream);
                           }
+                          SerializeUtils.serializeObjects(
+                              isNull ? new Object[0] : results, dataOutputStream);
+
                           resultHandler.onComplete(
                               ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
                         })
@@ -146,10 +149,16 @@ public class RemoteSeriesReaderByTimestampTest {
 
       RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
 
+      long[] times = new long[100];
       for (int i = 0; i < 100; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        times[i] = i;
       }
-      assertNull(reader.getValueInTimestamp(101));
+      Object[] results = reader.getValuesInTimestamps(times, times.length);
+      for (int i = 0; i < 100; i++) {
+        assertEquals(i * 1.0, results[i]);
+      }
+      times[0] = 101;
+      assertEquals(0, reader.getValuesInTimestamps(times, 1).length);
     } finally {
       QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
@@ -180,8 +189,13 @@ public class RemoteSeriesReaderByTimestampTest {
               + (endTime - startTime));
       // normal read
       assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
+      long[] times = new long[50];
+      for (int i = 0; i < 50; i++) {
+        times[i] = i;
+      }
+      Object[] results = reader.getValuesInTimestamps(times, 50);
       for (int i = 0; i < 50; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        assertEquals(i * 1.0, results[i]);
       }
 
       endTime = System.currentTimeMillis();
@@ -191,14 +205,22 @@ public class RemoteSeriesReaderByTimestampTest {
               + (endTime - startTime));
       failedNodes.add(TestUtils.getNode(0));
       for (int i = 50; i < 80; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        times[i - 50] = i;
+      }
+      results = reader.getValuesInTimestamps(times, 30);
+      for (int i = 50; i < 80; i++) {
+        assertEquals(i * 1.0, results[i - 50]);
       }
       assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
 
       // a bad client, change to another node again
       failedNodes.add(TestUtils.getNode(1));
       for (int i = 80; i < 90; i++) {
-        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+        times[i - 80] = i;
+      }
+      results = reader.getValuesInTimestamps(times, 10);
+      for (int i = 80; i < 90; i++) {
+        assertEquals(i * 1.0, results[i - 80]);
       }
       assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
 
@@ -211,7 +233,8 @@ public class RemoteSeriesReaderByTimestampTest {
       failedNodes.add(TestUtils.getNode(2));
 
       try {
-        reader.getValueInTimestamp(90);
+        times[0] = 90;
+        reader.getValuesInTimestamps(times, 1);
         fail();
       } catch (IOException e) {
         assertEquals("no available client.", e.getMessage());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 84d069a..1a52082 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -803,11 +803,15 @@ public class DataGroupMemberTest extends MemberTest {
     AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
 
+    List<Long> timestamps = new ArrayList<>(5);
     for (int i = 5; i < 10; i++) {
-      new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
-      Object value = SerializeUtils.deserializeObject(dataResult.get());
-      assertEquals(i * 1.0, (Double) value, 0.00001);
+      timestamps.add((long) i);
+    }
+    new DataAsyncService(dataGroupMember)
+        .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+    Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+    for (int i = 5; i < 10; i++) {
+      assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
     }
 
     new DataAsyncService(dataGroupMember)
@@ -861,11 +865,15 @@ public class DataGroupMemberTest extends MemberTest {
 
     AtomicReference<ByteBuffer> dataResult = new AtomicReference<>();
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0), dataResult);
+    List<Long> timestamps = new ArrayList<>(4);
     for (int i = 5; i < 9; i++) {
-      new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i, dataHandler);
-      Object value = SerializeUtils.deserializeObject(dataResult.get());
-      assertEquals(i * 1.0, (Double) value, 0.00001);
+      timestamps.add((long) i);
+    }
+    new DataAsyncService(dataGroupMember)
+        .fetchSingleSeriesByTimestamps(TestUtils.getNode(0), readerId, timestamps, dataHandler);
+    Object[] values = SerializeUtils.deserializeObjects(dataResult.get());
+    for (int i = 5; i < 9; i++) {
+      assertEquals(i * 1.0, (Double) values[i - 5], 0.00001);
     }
 
     new DataAsyncService(dataGroupMember)
@@ -896,11 +904,13 @@ public class DataGroupMemberTest extends MemberTest {
   public void testFetchWithoutQuery() {
     System.out.println("Start testFetchWithoutQuery()");
     AtomicReference<Exception> result = new AtomicReference<>();
+    List<Long> timestamps = new ArrayList<>(1);
+    timestamps.add((long) 0);
     new DataAsyncService(dataGroupMember)
-        .fetchSingleSeriesByTimestamp(
+        .fetchSingleSeriesByTimestamps(
             TestUtils.getNode(0),
             0,
-            0,
+            timestamps,
             new AsyncMethodCallback<ByteBuffer>() {
               @Override
               public void onComplete(ByteBuffer buffer) {}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 08914d1..e4e6b25 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -881,6 +881,10 @@ public class MetaGroupMemberTest extends MemberTest {
 
     try {
       ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
+      long[] times = new long[10];
+      for (int i = 0; i < 10; i++) {
+        times[i] = i;
+      }
       for (int i = 0; i < 10; i++) {
         IReaderByTimestamp readerByTimestamp =
             readerFactory.getReaderByTimestamp(
@@ -889,8 +893,10 @@ public class MetaGroupMemberTest extends MemberTest {
                 TSDataType.DOUBLE,
                 context,
                 true);
+
+        Object[] values = readerByTimestamp.getValuesInTimestamps(times, 10);
         for (int j = 0; j < 10; j++) {
-          assertEquals(j * 1.0, (double) readerByTimestamp.getValueInTimestamp(j), 0.00001);
+          assertEquals(j * 1.0, (double) values[j], 0.00001);
         }
       }
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 00b6df3..8f0b04d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -69,17 +69,18 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
   @Override
   public Object[] getValuesInTimestamps(long[] timestamp, int length) throws IOException {
     Object[] results = new Object[length];
-    for (int i = 0; i < length; i++) {
+    int timeIndex;
+    for (timeIndex = 0; timeIndex < length; timeIndex++) {
       seriesReader.setTimeFilter(timestamp[0]);
-      if ((batchData == null || !hasAvailableData(batchData, timestamp[i]))
-          && !hasNext(timestamp[i])) {
+      if ((batchData == null || !hasAvailableData(batchData, timestamp[timeIndex]))
+          && !hasNext(timestamp[timeIndex])) {
         // there is no more data
         break;
       }
-      results[i] = batchData.getValueInTimestamp(timestamp[i]);
+      results[timeIndex] = batchData.getValueInTimestamp(timestamp[timeIndex]);
     }
 
-    return results;
+    return timeIndex != 0 ? results : null;
   }
 
   @Override
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 7c0e506..a8d3baf 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -344,12 +344,12 @@ service TSDataService extends RaftService {
   long querySingleSeriesByTimestamp(1:SingleSeriesQueryRequest request)
 
    /**
-   * Fetch one value at given timestamp using the resultSetId generated by
+   * Fetch values at given timestamps using the resultSetId generated by
    * querySingleSeriesByTimestamp.
    * @return a ByteBuffer containing the serialized value or an empty buffer if there
    * are not more results.
    **/
-   binary fetchSingleSeriesByTimestamp(1:Node header, 2:long readerId, 3:long timestamp)
+   binary fetchSingleSeriesByTimestamps(1:Node header, 2:long readerId, 3:list<long> timestamps)
 
   /**
   * Find the local query established for the remote query and release all its resource.