You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/10 08:12:27 UTC

[iotdb] branch clusterQueryOpt created (now 9ed42d1)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 9ed42d1  implement single node version

This branch includes the following new commits:

     new 5e41549  design and pseudo code
     new 9ed42d1  implement single node version

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/02: design and pseudo code

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5e415497a9e7a513eb7d290a562c7f3ae014991e
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Mar 9 10:18:18 2021 +0800

    design and pseudo code
---
 .../cluster/query/reader/ClusterReaderFactory.java |  1 +
 .../dataset/RawQueryDataSetWithValueFilter.java    |  8 +--
 .../RemoteRawQueryDataSetWithValueFilter.java      | 68 ++++++++++++++++++++++
 .../db/query/reader/series/IReaderByTimestamp.java |  6 ++
 .../reader/series/SeriesReaderByTimestamp.java     | 16 +++++
 5 files changed, 95 insertions(+), 4 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 9b78175..e77d7cd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -217,6 +217,7 @@ public class ClusterReaderFactory {
       boolean ascending)
       throws StorageEngineException, EmptyIntervalException {
     // make sure the partition table is new
+    // TODO: don't need to sync metadata for every reader
     try {
       metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index bf48f10..e5349dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -31,11 +31,11 @@ import java.util.List;
 
 public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFInputDataSet {
 
-  private final TimeGenerator timeGenerator;
-  private final List<IReaderByTimestamp> seriesReaderByTimestampList;
-  private final List<Boolean> cached;
+  protected final TimeGenerator timeGenerator;
+  protected final List<IReaderByTimestamp> seriesReaderByTimestampList;
+  protected final List<Boolean> cached;
 
-  private boolean hasCachedRow;
+  protected boolean hasCachedRow;
   private RowRecord cachedRowRecord;
   private Object[] cachedRowInObjects;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java
new file mode 100644
index 0000000..c93e139
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java
@@ -0,0 +1,68 @@
+package org.apache.iotdb.db.query.dataset;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RemoteRawQueryDataSetWithValueFilter extends RawQueryDataSetWithValueFilter {
+
+  private List<RowRecord> cachedRowRecords = new ArrayList<>();
+  private Object[] objects;
+  private boolean[] isAllNull;
+  /**
+   * constructor of EngineDataSetWithValueFilter.
+   *
+   * @param paths paths in List structure
+   * @param dataTypes time series data type
+   * @param timeGenerator EngineTimeGenerator object
+   * @param readers readers in List(IReaderByTimeStamp) structure
+   * @param cached
+   * @param ascending specifies how the data should be sorted,'True' means read in ascending time
+   */
+  public RemoteRawQueryDataSetWithValueFilter(
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      TimeGenerator timeGenerator,
+      List<IReaderByTimestamp> readers,
+      List<Boolean> cached,
+      boolean ascending) {
+    super(paths, dataTypes, timeGenerator, readers, cached, ascending);
+  }
+
+  /**
+   * Cache row record
+   *
+   * @return if there has next row record.
+   */
+  private boolean cacheRowRecord() throws IOException {
+    int cachedTimeCnt = 0;
+    long[] cachedTimeArray = new long[MAX_TIME_NUM];
+    // TODO: LIMIT constraint
+    while (timeGenerator.hasNext() && cachedTimeCnt < MAX_TIME_NUM) {
+      // 1. fill time array from time Generator
+      cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+    }
+    for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+      // 2. fetch results of each time series from readers using time array
+      Object[] results = seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
+      // 3. use values in results to fill row record
+      for (int j = 0; j < MAX_TIME_NUM; j++) {
+        if (i == 0) {
+          RowRecord rowRecord = new RowRecord(cachedTimeArray[]);
+        }
+        fillRowRecord();
+        if (results[j] != null) {
+          isAllNull = false;
+        }
+      }
+    }
+    // 4. remove rowRecord if all values in one timestamp are null
+    removeNonExistRecord();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
index 57d4813..28a5ad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
@@ -37,6 +37,12 @@ public interface IReaderByTimestamp {
   Object getValueInTimestamp(long timestamp) throws IOException;
 
   /**
+   * Returns all the corresponding values under the array of timestamp. Returns null if no value
+   * under one timestamp.
+   */
+  Object[] getValueInTimestamps(long[] timestamps) throws IOException;
+
+  /**
    * Returns whether there is no more data in reader.
    *
    * <p>True means no more data. False means you can still get more data
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index f09b980..551c890 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -77,6 +77,22 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
   }
 
   @Override
+  public Object[] getValueInTimestamps(long[] timestamp) throws IOException {
+    seriesReader.setTimeFilter(timestamp[0]);
+    Object[] results = new Object[timestamp.length];
+    for (int i = 0; i < timestamp.length; i++) {
+      if ((batchData == null || !hasAvailableData(batchData, timestamp[i]))
+          && !hasNext(timestamp[i])) {
+        // there is no more data
+        break;
+      }
+      results[i] = batchData.getValueInTimestamp(timestamp[i]);
+    }
+
+    return results;
+  }
+
+  @Override
   public boolean readerIsEmpty() throws IOException {
     return seriesReader.isEmpty() && isEmpty(batchData);
   }


[iotdb] 02/02: implement single node version

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ed42d1fc7c1857325eb14bc6b81246129dff805
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Mar 10 16:10:38 2021 +0800

    implement single node version
---
 .../iotdb/cluster/query/reader/EmptyReader.java    |   5 +
 .../reader/RemoteSeriesReaderByTimestamp.java      |  14 ++
 .../dataset/RawQueryDataSetWithValueFilter.java    | 157 ++++++++++++---------
 .../RemoteRawQueryDataSetWithValueFilter.java      |  68 ---------
 .../adapter/ByTimestampReaderAdapter.java          |   5 +
 .../reader/chunk/DiskChunkReaderByTimestamp.java   |   5 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   5 +-
 .../query/dataset/DataSetWithTimeGenerator.java    |   2 +-
 .../tsfile/read/query/dataset/QueryDataSet.java    |   5 +
 .../read/query/timegenerator/TimeGenerator.java    |  38 +++--
 .../read/query/timegenerator/node/AndNode.java     |  18 +--
 .../read/query/timegenerator/node/OrNode.java      |  42 +++---
 .../tsfile/read/reader/FakedTimeGenerator.java     |  15 +-
 13 files changed, 194 insertions(+), 185 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index 933a9ab..6db6445 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -160,6 +160,11 @@ public class EmptyReader extends BaseManagedSeriesReader
   }
 
   @Override
+  public Object[] getValueInTimestamps(long[] timestamps) {
+    return null;
+  }
+
+  @Override
   public boolean readerIsEmpty() {
     return false;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 2b68ef4..4a32acc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -107,4 +107,18 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
       }
     }
   }
+
+  @Override
+  public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
+    return null;
+  }
+
+  @SuppressWarnings("java:S2274") // enable timeout
+  private ByteBuffer fetchResultAsync(long[] timestamps) throws IOException {
+    return null;
+  }
+
+  private ByteBuffer fetchResultSync(long[] timestamps) throws IOException {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index e5349dc..3fb14f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -31,13 +31,14 @@ import java.util.List;
 
 public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFInputDataSet {
 
-  protected final TimeGenerator timeGenerator;
-  protected final List<IReaderByTimestamp> seriesReaderByTimestampList;
-  protected final List<Boolean> cached;
+  private final TimeGenerator timeGenerator;
+  private final List<IReaderByTimestamp> seriesReaderByTimestampList;
+  private final List<Boolean> cached;
 
-  protected boolean hasCachedRow;
-  private RowRecord cachedRowRecord;
-  private Object[] cachedRowInObjects;
+  private List<RowRecord> cachedRowRecords = new ArrayList<>();
+
+  /** Used for UDF. */
+  private List<Object[]> cachedRowInObjects = new ArrayList<>();
 
   /**
    * constructor of EngineDataSetWithValueFilter.
@@ -64,61 +65,78 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
 
   @Override
   public boolean hasNextWithoutConstraint() throws IOException {
-    if (hasCachedRow) {
+    if (!cachedRowRecords.isEmpty()) {
       return true;
     }
-    return cacheRowRecord();
+    return cacheRowRecords();
   }
 
+  /** @return the first record of cached rows or null if there is no more data */
   @Override
   public RowRecord nextWithoutConstraint() throws IOException {
-    if (!hasCachedRow && !cacheRowRecord()) {
+    if (cachedRowRecords.isEmpty() && !cacheRowRecords()) {
       return null;
     }
-    hasCachedRow = false;
-    return cachedRowRecord;
+
+    return cachedRowRecords.remove(0);
   }
 
   /**
-   * Cache row record
+   * Cache row records
    *
    * @return if there has next row record.
    */
-  private boolean cacheRowRecord() throws IOException {
-    while (timeGenerator.hasNext()) {
-      boolean hasField = false;
-      long timestamp = timeGenerator.next();
-      RowRecord rowRecord = new RowRecord(timestamp);
-
-      for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
-        Object value;
-        // get value from readers in time generator
-        if (cached.get(i)) {
-          value = timeGenerator.getValue(paths.get(i), timestamp);
-        } else {
-          // get value from series reader without filter
-          IReaderByTimestamp reader = seriesReaderByTimestampList.get(i);
-          value = reader.getValueInTimestamp(timestamp);
-        }
-        if (value == null) {
-          rowRecord.addField(null);
+  private boolean cacheRowRecords() throws IOException {
+    int cachedTimeCnt = 0;
+    long[] cachedTimeArray = new long[fetchSize];
+    // TODO: LIMIT constraint
+    // 1. fill time array from time Generator
+    while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
+      cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+    }
+    if (cachedTimeCnt == 0) {
+      return false;
+    }
+    RowRecord[] rowRecords = new RowRecord[cachedTimeCnt];
+    for (int i = 0; i < cachedTimeCnt; i++) {
+      rowRecords[i] = new RowRecord(cachedTimeArray[i]);
+    }
+
+    boolean[] hasField = new boolean[cachedTimeCnt];
+    // 2. fetch results of each time series using time array
+    for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+      Object[] results;
+      // get value from readers in time generator
+      if (cached.get(i)) {
+        results = timeGenerator.getValues(paths.get(i));
+      } else {
+        results = seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
+      }
+
+      // 3. use values in results to fill row record
+      for (int j = 0; j < cachedTimeCnt; j++) {
+        if (results[j] == null) {
+          rowRecords[j].addField(null);
         } else {
-          hasField = true;
-          rowRecord.addField(value, dataTypes.get(i));
+          hasField[j] = true;
+          rowRecords[j].addField(results[j], dataTypes.get(i));
         }
       }
-      if (hasField) {
-        hasCachedRow = true;
-        cachedRowRecord = rowRecord;
-        break;
+    }
+    // 4. remove rowRecord if all values in one timestamp are null
+    for (int i = 0; i < cachedTimeCnt; i++) {
+      if (hasField[i]) {
+        cachedRowRecords.add(rowRecords[i]);
       }
     }
-    return hasCachedRow;
+
+    // 5. check whether there is next row record
+    return !cachedRowRecords.isEmpty();
   }
 
   @Override
   public boolean hasNextRowInObjects() throws IOException {
-    if (hasCachedRow) {
+    if (!cachedRowRecords.isEmpty()) {
       return true;
     }
     return cacheRowInObjects();
@@ -126,40 +144,53 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI
 
   @Override
   public Object[] nextRowInObjects() throws IOException {
-    if (!hasCachedRow && !cacheRowInObjects()) {
+    if (cachedRowRecords.isEmpty() && !cacheRowInObjects()) {
       // values + timestamp
       return new Object[seriesReaderByTimestampList.size() + 1];
     }
-    hasCachedRow = false;
-    return cachedRowInObjects;
+
+    return cachedRowInObjects.remove(0);
   }
 
   private boolean cacheRowInObjects() throws IOException {
-    int seriesNumber = seriesReaderByTimestampList.size();
-    while (timeGenerator.hasNext()) {
-      boolean hasField = false;
-
-      Object[] rowInObjects = new Object[seriesNumber + 1];
-      long timestamp = timeGenerator.next();
-      rowInObjects[seriesNumber] = timestamp;
-
-      for (int i = 0; i < seriesNumber; i++) {
-        Object value =
-            cached.get(i)
-                ? timeGenerator.getValue(paths.get(i), timestamp)
-                : seriesReaderByTimestampList.get(i).getValueInTimestamp(timestamp);
-        if (value != null) {
-          hasField = true;
-          rowInObjects[i] = value;
-        }
+    int cachedTimeCnt = 0;
+    long[] cachedTimeArray = new long[fetchSize];
+
+    // TODO: LIMIT constraint
+    // 1. fill time array from time Generator
+    while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
+      cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+    }
+    Object[][] rowsInObject = new Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1];
+    for (int i = 0; i < cachedTimeCnt; i++) {
+      rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i];
+    }
+
+    boolean[] hasField = new boolean[cachedTimeCnt];
+    // 2. fetch results of each time series using time array
+    for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+      Object[] results;
+      // get value from readers in time generator
+      if (cached.get(i)) {
+        results = timeGenerator.getValues(paths.get(i));
+      } else {
+        results = seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
       }
 
-      if (hasField) {
-        hasCachedRow = true;
-        cachedRowInObjects = rowInObjects;
-        break;
+      // 3. use values in results to fill row record
+      for (int j = 0; j < cachedTimeCnt; j++) {
+        if (results[j] != null) hasField[i] = true;
+        rowsInObject[j][i] = results[j];
       }
     }
-    return hasCachedRow;
+    // 4. remove rowRecord if all values in one timestamp are null
+    for (int i = 0; i < cachedTimeCnt; i++) {
+      if (hasField[i]) {
+        cachedRowInObjects.add(rowsInObject[i]);
+      }
+    }
+
+    // 5. check whether there is next row record
+    return !cachedRowRecords.isEmpty();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java
deleted file mode 100644
index c93e139..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.iotdb.db.query.dataset;
-
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class RemoteRawQueryDataSetWithValueFilter extends RawQueryDataSetWithValueFilter {
-
-  private List<RowRecord> cachedRowRecords = new ArrayList<>();
-  private Object[] objects;
-  private boolean[] isAllNull;
-  /**
-   * constructor of EngineDataSetWithValueFilter.
-   *
-   * @param paths paths in List structure
-   * @param dataTypes time series data type
-   * @param timeGenerator EngineTimeGenerator object
-   * @param readers readers in List(IReaderByTimeStamp) structure
-   * @param cached
-   * @param ascending specifies how the data should be sorted,'True' means read in ascending time
-   */
-  public RemoteRawQueryDataSetWithValueFilter(
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      TimeGenerator timeGenerator,
-      List<IReaderByTimestamp> readers,
-      List<Boolean> cached,
-      boolean ascending) {
-    super(paths, dataTypes, timeGenerator, readers, cached, ascending);
-  }
-
-  /**
-   * Cache row record
-   *
-   * @return if there has next row record.
-   */
-  private boolean cacheRowRecord() throws IOException {
-    int cachedTimeCnt = 0;
-    long[] cachedTimeArray = new long[MAX_TIME_NUM];
-    // TODO: LIMIT constraint
-    while (timeGenerator.hasNext() && cachedTimeCnt < MAX_TIME_NUM) {
-      // 1. fill time array from time Generator
-      cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
-    }
-    for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
-      // 2. fetch results of each time series from readers using time array
-      Object[] results = seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
-      // 3. use values in results to fill row record
-      for (int j = 0; j < MAX_TIME_NUM; j++) {
-        if (i == 0) {
-          RowRecord rowRecord = new RowRecord(cachedTimeArray[]);
-        }
-        fillRowRecord();
-        if (results[j] != null) {
-          isAllNull = false;
-        }
-      }
-    }
-    // 4. remove rowRecord if all values in one timestamp are null
-    removeNonExistRecord();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
index ba482c4..befcc08 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
@@ -62,4 +62,9 @@ public class ByTimestampReaderAdapter implements IReaderByTimestamp {
 
     return null;
   }
+
+  @Override
+  public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
index bc87cf7..5095ccd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
@@ -66,6 +66,11 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
     return null;
   }
 
+  @Override
+  public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
+    return null;
+  }
+
   private boolean hasNext() throws IOException {
     if (data != null && data.hasCurrent()) {
       return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ad60bfc..9004b7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -677,7 +677,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp = getQueryColumnHeaders(plan, username);
       }
       // create and cache dataset
-      QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
+      QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
       if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
         resp = getListDataSetHeaders(newDataSet);
       } else if (plan instanceof UDFPlan) {
@@ -1025,12 +1025,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   /** create QueryDataSet and buffer it for fetchResults */
-  private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan)
+  private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan, int fetchSize)
       throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
           IOException, MetadataException, SQLException, TException, InterruptedException {
 
     QueryContext context = genQueryContext(queryId);
     QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
+    queryDataSet.setFetchSize(fetchSize);
     queryId2DataSet.put(queryId, queryDataSet);
     return queryDataSet;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
index 8985bcc..7d911c4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
@@ -72,7 +72,7 @@ public class DataSetWithTimeGenerator extends QueryDataSet {
 
       // get value from readers in time generator
       if (cached.get(i)) {
-        Object value = timeGenerator.getValue(paths.get(i), timestamp);
+        Object value = timeGenerator.getValue(paths.get(i));
         rowRecord.addField(value, dataTypes.get(i));
         continue;
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index eb7a206..c68a0e0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -33,6 +33,7 @@ public abstract class QueryDataSet {
   protected int rowLimit = 0; // rowLimit > 0 means the LIMIT constraint exists
   protected int rowOffset = 0;
   protected int alreadyReturnedRowNum = 0;
+  protected int fetchSize = 10000;
   protected boolean ascending;
 
   public QueryDataSet() {}
@@ -81,6 +82,10 @@ public abstract class QueryDataSet {
     return nextWithoutConstraint();
   }
 
+  public void setFetchSize(int fetchSize) {
+    this.fetchSize = fetchSize;
+  }
+
   public abstract RowRecord nextWithoutConstraint() throws IOException;
 
   public List<Path> getPaths() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
index 78c0686..2ea5fb8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
@@ -31,7 +31,9 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 
 /**
  * All SingleSeriesExpression involved in a IExpression will be transferred to a TimeGenerator tree
@@ -41,7 +43,8 @@ import java.util.*;
  */
 public abstract class TimeGenerator {
 
-  private HashMap<Path, List<LeafNode>> leafCache = new HashMap<>();
+  private HashMap<Path, List<LeafNode>> leafNodeCache = new HashMap<>();
+  private HashMap<Path, List<Object>> leafValuesCache;
   protected Node operatorNode;
   private boolean hasOrNode;
 
@@ -50,18 +53,27 @@ public abstract class TimeGenerator {
   }
 
   public long next() throws IOException {
+    if (!hasOrNode) {
+      if (leafValuesCache == null) {
+        leafValuesCache = new HashMap<>();
+      }
+      leafNodeCache.forEach(
+          (path, nodes) ->
+              leafValuesCache
+                  .computeIfAbsent(path, k -> new ArrayList<>())
+                  .add(nodes.get(0).currentValue()));
+    }
     return operatorNode.next();
   }
 
-  public Object getValue(Path path, long time) {
-    for (LeafNode leafNode : leafCache.get(path)) {
-      if (!leafNode.currentTimeIs(time)) {
-        continue;
-      }
-      return leafNode.currentValue();
-    }
+  /** ATTENTION: this method should only be used when there is no `OR` node */
+  public Object[] getValues(Path path) {
+    return leafValuesCache.get(path) == null ? null : leafValuesCache.remove(path).toArray();
+  }
 
-    return null;
+  /** ATTENTION: this method should only be used when there is no `OR` node */
+  public Object getValue(Path path) {
+    return leafValuesCache.get(path) == null ? null : leafValuesCache.get(path).remove(0);
   }
 
   public void constructNode(IExpression expression) throws IOException {
@@ -76,13 +88,13 @@ public abstract class TimeGenerator {
       IBatchReader seriesReader = generateNewBatchReader(singleSeriesExp);
       Path path = singleSeriesExp.getSeriesPath();
 
-      if (!leafCache.containsKey(path)) {
-        leafCache.put(path, new ArrayList<>());
+      if (!leafNodeCache.containsKey(path)) {
+        leafNodeCache.put(path, new ArrayList<>());
       }
 
       // put the current reader to valueCache
       LeafNode leafNode = new LeafNode(seriesReader);
-      leafCache.get(path).add(leafNode);
+      leafNodeCache.get(path).add(leafNode);
 
       return leafNode;
     } else {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
index b90aafb..eff83b0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
@@ -26,8 +26,8 @@ public class AndNode implements Node {
   private Node leftChild;
   private Node rightChild;
 
-  private long cachedValue;
-  private boolean hasCachedValue;
+  private long cachedTime;
+  private boolean hasCachedTime;
   private boolean ascending = true;
 
   /**
@@ -39,20 +39,20 @@ public class AndNode implements Node {
   public AndNode(Node leftChild, Node rightChild) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedValue = false;
+    this.hasCachedTime = false;
   }
 
   public AndNode(Node leftChild, Node rightChild, boolean ascending) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedValue = false;
+    this.hasCachedTime = false;
     this.ascending = ascending;
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   @Override
   public boolean hasNext() throws IOException {
-    if (hasCachedValue) {
+    if (hasCachedTime) {
       return true;
     }
     if (leftChild.hasNext() && rightChild.hasNext()) {
@@ -69,8 +69,8 @@ public class AndNode implements Node {
     long rightValue = rightChild.next();
     while (true) {
       if (leftValue == rightValue) {
-        this.hasCachedValue = true;
-        this.cachedValue = leftValue;
+        this.hasCachedTime = true;
+        this.cachedTime = leftValue;
         return true;
       }
       if (seekRight.test(leftValue, rightValue)) {
@@ -92,8 +92,8 @@ public class AndNode implements Node {
   @Override
   public long next() throws IOException {
     if (hasNext()) {
-      hasCachedValue = false;
-      return cachedValue;
+      hasCachedTime = false;
+      return cachedTime;
     }
     throw new IOException("no more data");
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
index 6205030..a133cbb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
@@ -25,55 +25,55 @@ public class OrNode implements Node {
   private Node leftChild;
   private Node rightChild;
 
-  private boolean hasCachedLeftValue;
-  private long cachedLeftValue;
-  private boolean hasCachedRightValue;
-  private long cachedRightValue;
+  private boolean hasCachedLeftTime;
+  private long cachedLeftTime;
+  private boolean hasCachedRightTime;
+  private long cachedRightTime;
   private boolean ascending = true;
 
   public OrNode(Node leftChild, Node rightChild) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedLeftValue = false;
-    this.hasCachedRightValue = false;
+    this.hasCachedLeftTime = false;
+    this.hasCachedRightTime = false;
   }
 
   public OrNode(Node leftChild, Node rightChild, boolean ascending) {
     this.leftChild = leftChild;
     this.rightChild = rightChild;
-    this.hasCachedLeftValue = false;
-    this.hasCachedRightValue = false;
+    this.hasCachedLeftTime = false;
+    this.hasCachedRightTime = false;
     this.ascending = ascending;
   }
 
   @Override
   public boolean hasNext() throws IOException {
-    if (hasCachedLeftValue || hasCachedRightValue) {
+    if (hasCachedLeftTime || hasCachedRightTime) {
       return true;
     }
     return leftChild.hasNext() || rightChild.hasNext();
   }
 
   private boolean hasLeftValue() throws IOException {
-    return hasCachedLeftValue || leftChild.hasNext();
+    return hasCachedLeftTime || leftChild.hasNext();
   }
 
   private long getLeftValue() throws IOException {
-    if (hasCachedLeftValue) {
-      hasCachedLeftValue = false;
-      return cachedLeftValue;
+    if (hasCachedLeftTime) {
+      hasCachedLeftTime = false;
+      return cachedLeftTime;
     }
     return leftChild.next();
   }
 
   private boolean hasRightValue() throws IOException {
-    return hasCachedRightValue || rightChild.hasNext();
+    return hasCachedRightTime || rightChild.hasNext();
   }
 
   private long getRightValue() throws IOException {
-    if (hasCachedRightValue) {
-      hasCachedRightValue = false;
-      return cachedRightValue;
+    if (hasCachedRightTime) {
+      hasCachedRightTime = false;
+      return cachedRightTime;
     }
     return rightChild.next();
   }
@@ -99,12 +99,12 @@ public class OrNode implements Node {
 
   private long popAndFillNextCache(boolean popLeft, boolean popRight, long left, long right) {
     if (popLeft) {
-      hasCachedRightValue = true;
-      cachedRightValue = right;
+      hasCachedRightTime = true;
+      cachedRightTime = right;
       return left;
     } else if (popRight) {
-      hasCachedLeftValue = true;
-      cachedLeftValue = left;
+      hasCachedLeftTime = true;
+      cachedLeftTime = left;
       return right;
     } else {
       return left;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
index 9b848dc..85b30c0 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
@@ -35,19 +35,18 @@ public class FakedTimeGenerator extends TimeGenerator {
 
   public FakedTimeGenerator() throws IOException {
 
-    // or(and(d1.s1, d2.s2), d2.s2)
+    // and(and(d1.s1, d2.s2), d2.s2)
     IExpression expression =
-        BinaryExpression.or(
+        BinaryExpression.and(
             BinaryExpression.and(
                 new SingleSeriesExpression(
                     new Path("d1", "s1"),
-                    FilterFactory.and(TimeFilter.gtEq(1L), TimeFilter.ltEq(5L))),
+                    FilterFactory.and(TimeFilter.gtEq(3L), TimeFilter.ltEq(8L))),
                 new SingleSeriesExpression(
                     new Path("d2", "s2"),
                     FilterFactory.and(TimeFilter.gtEq(1L), TimeFilter.ltEq(10L)))),
             new SingleSeriesExpression(
-                new Path("d2", "s2"),
-                FilterFactory.and(TimeFilter.gtEq(11L), TimeFilter.ltEq(15L))));
+                new Path("d2", "s2"), FilterFactory.and(TimeFilter.gtEq(2L), TimeFilter.ltEq(6L))));
 
     super.constructNode(expression);
   }
@@ -68,10 +67,10 @@ public class FakedTimeGenerator extends TimeGenerator {
     Path path = new Path("d1", "s1");
     long count = 0;
     while (fakedTimeGenerator.hasNext()) {
-      long time = fakedTimeGenerator.next();
-      fakedTimeGenerator.getValue(path, time);
+      fakedTimeGenerator.next();
+      fakedTimeGenerator.getValue(path);
       count++;
     }
-    Assert.assertEquals(10L, count);
+    Assert.assertEquals(4L, count);
   }
 }