You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/11 02:46:15 UTC

[incubator-iotdb] branch master updated: [IOTDB-540] Accelerate Previous Fill (#931)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new faf4924  [IOTDB-540] Accelerate Previous Fill (#931)
faf4924 is described below

commit faf4924c328d94f49f58c5caedd0e25e4c073024
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Sat Apr 11 10:46:03 2020 +0800

    [IOTDB-540] Accelerate Previous Fill (#931)
    
    * Reimplement PreviousFill
---
 .../resources/conf/iotdb-engine.properties         |   4 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   4 +-
 .../db/engine/storagegroup/TsFileResource.java     |   2 +-
 .../iotdb/db/query/executor/FillQueryExecutor.java |  12 +-
 .../java/org/apache/iotdb/db/query/fill/IFill.java |  19 +-
 .../org/apache/iotdb/db/query/fill/LinearFill.java |  27 +-
 .../apache/iotdb/db/query/fill/PreviousFill.java   | 222 +++++++++++++--
 .../db/query/reader/chunk/MemChunkReader.java      |   1 -
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |   2 +-
 .../apache/iotdb/db/integration/IoTDBFillIT.java   | 316 +++++++++++++++++++--
 site/src/main/.vuepress/config.js                  |   2 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |  12 +
 12 files changed, 548 insertions(+), 75 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e0a13d7..b388823 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -238,8 +238,8 @@ upgrade_thread_num=1
 ### Query Configurations
 ####################
 
-# the default time period that used in fill query, 10min by default
-default_fill_interval=600000
+# the default time period that used in fill query, -1 by default means infinite past time
+default_fill_interval=-1
 
 ####################
 ### Merge Configurations
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fa3013a..a1dee4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -510,9 +510,9 @@ public class IoTDBConfig {
   private int memtableNumInEachStorageGroup = 10;
 
   /**
-   * the default fill interval in LinearFill and PreviousFill, 10min
+   * the default fill interval in LinearFill and PreviousFill, -1 means infinite past time
    */
-  private int defaultFillInterval = 600000;
+  private int defaultFillInterval = -1;
 
   /**
    * default TTL for storage groups that are not set TTL by statements, in ms
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ce20b85..43eac5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -280,7 +280,7 @@ public class TsFileResource {
   }
 
   public List<ChunkMetadata> getChunkMetadataList() {
-    return chunkMetadataList;
+    return new ArrayList<>(chunkMetadataList);
   }
 
   public List<ReadOnlyMemChunk> getReadOnlyMemChunk() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index 48e2a86..6b823c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -77,8 +77,6 @@ public class FillQueryExecutor {
           case INT64:
           case FLOAT:
           case DOUBLE:
-            fill = new LinearFill(dataType, queryTime, defaultFillInterval, defaultFillInterval);
-            break;
           case BOOLEAN:
           case TEXT:
             fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
@@ -89,7 +87,8 @@ public class FillQueryExecutor {
       } else {
         fill = typeIFillMap.get(dataType).copy();
       }
-      configureFill(fill, dataType, path, fillQueryPlan.getAllMeasurementsInDevice(path.getDevice()), context, queryTime);
+      fill.configureFill(path, dataType, queryTime,
+          fillQueryPlan.getAllMeasurementsInDevice(path.getDevice()), context);
 
       TimeValuePair timeValuePair = fill.getFillResult();
       if (timeValuePair == null || timeValuePair.getValue() == null) {
@@ -103,11 +102,4 @@ public class FillQueryExecutor {
     dataSet.setRecord(record);
     return dataSet;
   }
-
-  protected void configureFill(IFill fill, TSDataType dataType, Path path, Set<String> allSensors, QueryContext context,
-                               long queryTime) throws StorageEngineException, QueryProcessException {
-    fill.setDataType(dataType);
-    fill.setQueryTime(queryTime);
-    fill.constructReaders(path, allSensors, context);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index 1f3f686..675fed8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -23,13 +23,10 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 
 import java.io.IOException;
 import java.util.Set;
@@ -39,8 +36,6 @@ public abstract class IFill {
   long queryTime;
   TSDataType dataType;
 
-  IBatchReader allDataReader;
-
   public IFill(TSDataType dataType, long queryTime) {
     this.dataType = dataType;
     this.queryTime = queryTime;
@@ -51,17 +46,9 @@ public abstract class IFill {
 
   public abstract IFill copy();
 
-  public void constructReaders(Path path, Set<String> allSensors, QueryContext context)
-      throws StorageEngineException, QueryProcessException {
-    Filter timeFilter = constructFilter();
-    allDataReader = new SeriesRawDataBatchReader(path, allSensors, dataType, context,
-        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
-        timeFilter, null, null);
-  }
-
-  public void setAllDataReader(IBatchReader allDataReader) {
-    this.allDataReader = allDataReader;
-  }
+  public abstract void configureFill(Path path, TSDataType dataType, long queryTime,
+      Set<String> sensors, QueryContext context)
+      throws StorageEngineException, QueryProcessException;
 
   public Filter getFilter() {
     return constructFilter();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
index 0e4c20a..dd74809 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
@@ -19,13 +19,21 @@
 
 package org.apache.iotdb.db.query.fill;
 
+import java.util.Set;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
@@ -34,6 +42,7 @@ public class LinearFill extends IFill {
 
   private long beforeRange;
   private long afterRange;
+  private IBatchReader dataReader;
   private BatchData batchData;
 
   public LinearFill(long beforeRange, long afterRange) {
@@ -84,12 +93,24 @@ public class LinearFill extends IFill {
   }
 
   @Override
+  public void configureFill(Path path, TSDataType dataType, long queryTime,
+      Set<String> sensors, QueryContext context)
+      throws StorageEngineException, QueryProcessException {
+    this.dataType = dataType;
+    this.queryTime = queryTime;
+    Filter timeFilter = constructFilter();
+    dataReader = new SeriesRawDataBatchReader(path, sensors, dataType, context,
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
+        timeFilter, null, null);
+  }
+
+  @Override
   public TimeValuePair getFillResult() throws IOException, UnSupportedFillTypeException {
     TimeValuePair beforePair = null;
     TimeValuePair afterPair = null;
-    while (batchData.hasCurrent() || allDataReader.hasNextBatch()) {
-      if (!batchData.hasCurrent() && allDataReader.hasNextBatch()) {
-        batchData = allDataReader.nextBatch();
+    while (batchData.hasCurrent() || dataReader.hasNextBatch()) {
+      if (!batchData.hasCurrent() && dataReader.hasNextBatch()) {
+        batchData = dataReader.nextBatch();
       }
       afterPair = new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
       batchData.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
index 3d92fb6..19b6584 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
@@ -18,24 +18,49 @@
  */
 package org.apache.iotdb.db.query.fill;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 
 import java.io.IOException;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 public class PreviousFill extends IFill {
 
+  private Path seriesPath;
+  private QueryContext context;
   private long beforeRange;
-  private BatchData batchData;
+  private Set<String> allSensors;
+  private Filter timeFilter;
+
+  private QueryDataSource dataSource;
+
+  private List<TimeseriesMetadata> unseqTimeseriesMetadataList;
 
   public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) {
     super(dataType, queryTime);
     this.beforeRange = beforeRange;
-    batchData = new BatchData();
+    this.unseqTimeseriesMetadataList = new ArrayList<>();
   }
 
   public PreviousFill(long beforeRange) {
@@ -44,7 +69,7 @@ public class PreviousFill extends IFill {
 
   @Override
   public IFill copy() {
-    return new PreviousFill(dataType, queryTime, beforeRange);
+    return new PreviousFill(dataType,  queryTime, beforeRange);
   }
 
   @Override
@@ -60,27 +85,188 @@ public class PreviousFill extends IFill {
   }
 
   @Override
+  public void configureFill(Path path, TSDataType dataType, long queryTime,
+      Set<String> sensors, QueryContext context)
+      throws StorageEngineException, QueryProcessException {
+    this.seriesPath = path;
+    this.dataType = dataType;
+    this.context = context;
+    this.queryTime = queryTime;
+    this.allSensors = sensors;
+    this.timeFilter = constructFilter();
+    this.dataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
+  }
+
+  @Override
   public TimeValuePair getFillResult() throws IOException {
-    TimeValuePair beforePair = null;
-    TimeValuePair cachedPair;
-    while (batchData.hasCurrent() || allDataReader.hasNextBatch()) {
-      if (!batchData.hasCurrent() && allDataReader.hasNextBatch()) {
-        batchData = allDataReader.nextBatch();
+    TimeValuePair lastPointResult = retrieveValidLastPointFromSeqFiles();
+    UnpackOverlappedUnseqFiles(lastPointResult.getTimestamp());
+
+    long lastVersion = 0;
+    PriorityQueue<ChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime();
+    while (!sortedChunkMetatdataList.isEmpty()
+        && lastPointResult.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) {
+      ChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll();
+      TimeValuePair lastChunkPoint = getChunkLastPoint(chunkMetadata);
+      if (shouldUpdate(lastPointResult.getTimestamp(), lastVersion,
+          lastChunkPoint.getTimestamp(), chunkMetadata.getVersion())) {
+        lastPointResult = lastChunkPoint;
+        lastVersion = chunkMetadata.getVersion();
       }
-      cachedPair = new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
-      batchData.next();
-      if (cachedPair.getTimestamp() <= queryTime) {
-        beforePair = cachedPair;
-      } else {
+    }
+    return lastPointResult;
+  }
+
+  /** Pick up and cache the last sequence TimeseriesMetadata that satisfies timeFilter */
+  private TimeValuePair retrieveValidLastPointFromSeqFiles() throws IOException {
+    List<TsFileResource> seqFileResource = dataSource.getSeqResources();
+    TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
+    for (int index = seqFileResource.size() - 1; index >= 0; index--) {
+      TsFileResource resource = seqFileResource.get(index);
+      TimeseriesMetadata timeseriesMetadata =
+          FileLoaderUtils.loadTimeSeriesMetadata(
+              resource, seriesPath, context, timeFilter, allSensors);
+      if (timeseriesMetadata != null) {
+        if (timeseriesMetadata.getStatistics().canUseStatistics()
+            && endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
+          return constructLastPair(
+              timeseriesMetadata.getStatistics().getEndTime(),
+              timeseriesMetadata.getStatistics().getLastValue(),
+              dataType);
+        } else {
+          List<ChunkMetadata> seqChunkMetadataList =
+              FileLoaderUtils.loadChunkMetadataList(timeseriesMetadata);
+
+          for (int i = seqChunkMetadataList.size() - 1; i >= 0; i--) {
+            lastPoint = getChunkLastPoint(seqChunkMetadataList.get(i));
+            // last point of this sequence chunk is valid, quit the loop
+            if (lastPoint.getValue() != null) {
+              return lastPoint;
+            }
+          }
+        }
+      }
+    }
+
+    return lastPoint;
+  }
+
+  /**
+   * find the last TimeseriesMetadata in unseq files and unpack all overlapped unseq files
+   */
+  private void UnpackOverlappedUnseqFiles(long lBoundTime) throws IOException {
+    PriorityQueue<TsFileResource> unseqFileResource =
+        sortUnSeqFileResourcesInDecendingOrder(dataSource.getUnseqResources());
+
+    while (!unseqFileResource.isEmpty()) {
+      // The very end time of unseq files is smaller than lBoundTime,
+      // then skip all the rest unseq files
+      if (unseqFileResource.peek().getEndTimeMap().get(seriesPath.getDevice()) < lBoundTime) {
+        return;
+      }
+      TimeseriesMetadata timeseriesMetadata =
+          FileLoaderUtils.loadTimeSeriesMetadata(
+              unseqFileResource.poll(), seriesPath, context, timeFilter, allSensors);
+      if (timeseriesMetadata != null && timeseriesMetadata.getStatistics().canUseStatistics()
+          && lBoundTime <= timeseriesMetadata.getStatistics().getEndTime()) {
+        // The last timeseriesMetadata will be used as a pivot to filter the rest unseq files.
+        // Update lBoundTime with the last timeseriesMetadata's start time
+        lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getStartTime());
+        unseqTimeseriesMetadataList.add(timeseriesMetadata);
         break;
       }
     }
 
-    if (beforePair != null) {
-      beforePair.setTimestamp(queryTime);
-    } else {
-      beforePair = new TimeValuePair(queryTime, null);
+    // unpack all overlapped unseq files and fill unseqTimeseriesMetadata list
+    while (!unseqFileResource.isEmpty()
+        && (lBoundTime <= unseqFileResource.peek().getEndTimeMap().get(seriesPath.getDevice()))) {
+      TimeseriesMetadata timeseriesMetadata =
+          FileLoaderUtils.loadTimeSeriesMetadata(
+              unseqFileResource.poll(), seriesPath, context, timeFilter, allSensors);
+      unseqTimeseriesMetadataList.add(timeseriesMetadata);
+      // update lBoundTime if current unseq timeseriesMetadata's last point is a valid result
+      if (timeseriesMetadata.getStatistics().canUseStatistics()
+          && endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
+        lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getEndTime());
+      }
+    }
+  }
+
+  private TimeValuePair getChunkLastPoint(ChunkMetadata chunkMetaData) throws IOException {
+    TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
+    if (chunkMetaData == null) {
+      return lastPoint;
+    }
+    Statistics chunkStatistics = chunkMetaData.getStatistics();
+
+    if (chunkStatistics.canUseStatistics() && endtimeContainedByTimeFilter(chunkStatistics)) {
+      return constructLastPair(
+          chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), dataType);
+    }
+    List<IPageReader> pageReaders = FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter);
+    for (int i = pageReaders.size() - 1; i >= 0; i--) {
+      IPageReader pageReader = pageReaders.get(i);
+      Statistics pageStatistics = pageReader.getStatistics();
+      if (pageStatistics.canUseStatistics() && endtimeContainedByTimeFilter(pageStatistics)) {
+        lastPoint = constructLastPair(
+            pageStatistics.getEndTime(), pageStatistics.getLastValue(), dataType);
+      } else {
+        BatchData batchData = pageReader.getAllSatisfiedPageData();
+        lastPoint = batchData.getLastPairBeforeOrEqualTimestamp(queryTime);
+      }
+      if (lastPoint.getValue() != null) {
+        return lastPoint;
+      }
+    }
+    return lastPoint;
+  }
+
+  private boolean shouldUpdate(long time, long version, long newTime, long newVersion) {
+    return time < newTime || (time == newTime && version < newVersion);
+  }
+
+  private PriorityQueue<TsFileResource> sortUnSeqFileResourcesInDecendingOrder(
+      List<TsFileResource> tsFileResources) {
+    PriorityQueue<TsFileResource> unseqTsFilesSet =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              Map<String, Long> startTimeMap = o1.getEndTimeMap();
+              Long minTimeOfO1 = startTimeMap.get(seriesPath.getDevice());
+              Map<String, Long> startTimeMap2 = o2.getEndTimeMap();
+              Long minTimeOfO2 = startTimeMap2.get(seriesPath.getDevice());
+
+              return Long.compare(minTimeOfO2, minTimeOfO1);
+            });
+    unseqTsFilesSet.addAll(tsFileResources);
+    return unseqTsFilesSet;
+  }
+
+  private PriorityQueue<ChunkMetadata> sortUnseqChunkMetadatasByEndtime() throws IOException {
+    PriorityQueue<ChunkMetadata> chunkMetadataList =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              long endTime1 = o1.getEndTime();
+              long endTime2 = o2.getEndTime();
+              if (endTime1 < endTime2) {
+                return 1;
+              } else if (endTime1 > endTime2) {
+                return -1;
+              }
+              return Long.compare(o2.getVersion(), o1.getVersion());
+            });
+    for (TimeseriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) {
+      chunkMetadataList.addAll(timeseriesMetadata.loadChunkMetadataList());
     }
-    return beforePair;
+    return chunkMetadataList;
+  }
+
+  private boolean endtimeContainedByTimeFilter(Statistics statistics) {
+    return timeFilter.containStartEndTime(statistics.getEndTime(), statistics.getEndTime());
+  }
+
+  private TimeValuePair constructLastPair(long timestamp, Object value, TSDataType dataType) {
+    return new TimeValuePair(timestamp, TsPrimitiveType.getByType(dataType, value));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
index d831063..4884472 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
@@ -121,5 +121,4 @@ public class MemChunkReader implements IChunkReader, IPointReader {
     return Collections.singletonList(
         new MemPageReader(nextPageData(), readOnlyMemChunk.getChunkMetaData().getStatistics()));
   }
-
 }
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 86aadaf..f6b1e37 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -513,7 +513,7 @@ public class IoTDBAlignByDeviceIT {
   @Test
   public void fillTest() throws ClassNotFoundException {
     String[] retArray = new String[]{
-        "3,root.vehicle.d0,10000,40208,3.33,null,null,",
+        "3,root.vehicle.d0,10000,40000,3.33,null,null,",
         "3,root.vehicle.d1,999,null,null,null,null,",
     };
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
index cb7bf18..4bcf9dd 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
@@ -95,9 +95,38 @@ public class IoTDBFillIT {
           + "values(620, 500.5, false, 550)",
   };
 
+  private static String[] dataSet2 = new String[]{
+      "SET STORAGE GROUP TO root.ln.wf01.wt02",
+      "CREATE TIMESERIES root.ln.wf01.wt02.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt02.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(100, 100.1, false)",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(150, 200.2, true)",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(300, 500.5, false)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(600, 31.1, false)",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(750, 55.2, true)",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(900, 1020.5, false)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(1100, 98.41, false)",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(1250, 220.2, true)",
+      "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+          + "values(1400, 31, false)",
+      "flush",
+  };
+
   private static final String TIMESTAMP_STR = "Time";
-  private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature";
-  private static final String STATUS_STR = "root.ln.wf01.wt01.status";
+  private static final String TEMPERATURE_STR_1 = "root.ln.wf01.wt01.temperature";
+  private static final String STATUS_STR_1 = "root.ln.wf01.wt01.status";
+  private static final String TEMPERATURE_STR_2 = "root.ln.wf01.wt02.temperature";
+  private static final String STATUS_STR_2 = "root.ln.wf01.wt02.status";
   private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
 
   @Before
@@ -135,8 +164,8 @@ public class IoTDBFillIT {
       try (ResultSet resultSet = statement.getResultSet()) {
         cnt = 0;
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -149,8 +178,8 @@ public class IoTDBFillIT {
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -163,8 +192,8 @@ public class IoTDBFillIT {
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -177,8 +206,8 @@ public class IoTDBFillIT {
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -209,8 +238,8 @@ public class IoTDBFillIT {
       try (ResultSet resultSet = statement.getResultSet()) {
         cnt = 0;
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -223,8 +252,8 @@ public class IoTDBFillIT {
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -237,8 +266,8 @@ public class IoTDBFillIT {
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -269,8 +298,8 @@ Statement statement = connection.createStatement()) {
       try (ResultSet resultSet = statement.getResultSet()) {
         cnt = 0;
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -283,8 +312,252 @@ Statement statement = connection.createStatement()) {
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
-              + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void PreviousFillWithOnlySeqFileTest() throws SQLException {
+    String[] retArray = new String[]{
+        "1050,1020.5,false",
+        "800,55.2,true"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      ResultSet resultSet = statement.executeQuery("select temperature,status "
+          + "from root.ln.wf01.wt02 where time = 1050 Fill(double[previous])");
+
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans =
+            resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+                + "," + resultSet.getString(STATUS_STR_2);
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+
+      resultSet = statement.executeQuery("select temperature,status "
+          + "from root.ln.wf01.wt02 where time = 800 Fill(double[previous])");
+
+      while (resultSet.next()) {
+        String ans =
+            resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+                + "," + resultSet.getString(STATUS_STR_2);
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void PreviousFillWithOnlyUnseqFileOverlappedTest() throws SQLException {
+    String[] retArray = new String[]{
+        "58,82.1,true",
+        "40,121.22,true",
+        "80,32.2,false"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(50, 82.1, true)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(35, 121.22, true)");
+      statement.execute("flush");
+
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(25, 102.15, true)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(78, 32.2, false)");
+      statement.execute("flush");
+
+      int cnt = 0;
+      ResultSet resultSet = statement.executeQuery("select temperature,status "
+              + "from root.ln.wf01.wt02 where time = 58 Fill(double[previous])");
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+            + "," + resultSet.getString(STATUS_STR_2);
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+
+      resultSet = statement.executeQuery("select temperature,status "
+              + "from root.ln.wf01.wt02 where time = 40 Fill(double[previous])");
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+            + "," + resultSet.getString(STATUS_STR_2);
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+
+      resultSet = statement.executeQuery("select temperature,status "
+              + "from root.ln.wf01.wt02 where time = 80 Fill(double[previous])");
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+            + "," + resultSet.getString(STATUS_STR_2);
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void PreviousFillMultiUnseqFileWithSameLastTest() throws SQLException {
+    String[] retArray = new String[]{
+        "59,82.1,true",
+        "52,32.2,false",
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(50, 82.1, true)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(35, 121.22, true)");
+      statement.execute("flush");
+
+      int cnt = 0;
+      ResultSet resultSet = statement.executeQuery("select temperature,status "
+          + "from root.ln.wf01.wt02 where time = 59 Fill(double[previous])");
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+            + "," + resultSet.getString(STATUS_STR_2);
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(25, 102.15, true)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(50, 32.2, false)");
+      statement.execute("flush");
+
+      resultSet = statement.executeQuery("select temperature,status "
+          + "from root.ln.wf01.wt02 where time = 52 Fill(double[previous])");
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+            + "," + resultSet.getString(STATUS_STR_2);
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void PreviousFillSeqFileFilterOverlappedFilesTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "886,55.2,true",
+        "730,121.22,true",
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      int cnt = 0;
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(950, 82.1, true)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(650, 121.22, true)");
+      statement.execute("flush");
+
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(740, 33.1, false)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(420, 125.1, true)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(890, 22.82, false)");
+      statement.execute("flush");
+
+      {
+        ResultSet resultSet = statement.executeQuery(
+                "select temperature,status from root.ln.wf01.wt02 where time = 886 "
+                    + "Fill(double[previous])");
+
+        while (resultSet.next()) {
+          String ans = resultSet.getString(TIMESTAMP_STR) + ","
+                  + resultSet.getString(TEMPERATURE_STR_2) + ","
+                  + resultSet.getString(STATUS_STR_2);
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+      }
+
+      {
+        ResultSet resultSet = statement.executeQuery(
+                "select temperature,status from root.ln.wf01.wt02 where time = 730 "
+                    + "Fill(double[previous])");
+
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + ","
+                  + resultSet.getString(TEMPERATURE_STR_2) + ","
+                  + resultSet.getString(STATUS_STR_2);
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void PreviousFillUnseqFileFilterOverlappedFilesTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "990,121.22,true",
+        "925,33.1,false",
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      int cnt = 0;
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(1030, 82.1, true)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(940, 121.22, true)");
+      statement.execute("flush");
+
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(740, 62.1, false)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,status) values(980, true)");
+      statement.execute("flush");
+
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(910, 33.1, false)");
+      statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(620, 125.1, true)");
+      statement.execute("flush");
+
+      {
+        ResultSet resultSet = statement.executeQuery(
+            "select temperature,status from root.ln.wf01.wt02 where time = 990 "
+                + "Fill(double[previous])");
+
+        while (resultSet.next()) {
+          String ans = resultSet.getString(TIMESTAMP_STR) + ","
+              + resultSet.getString(TEMPERATURE_STR_2) + ","
+              + resultSet.getString(STATUS_STR_2);
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+      }
+
+      {
+        ResultSet resultSet = statement.executeQuery(
+            "select temperature,status from root.ln.wf01.wt02 where time = 925 "
+                + "Fill(double[previous])");
+
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + ","
+                  + resultSet.getString(TEMPERATURE_STR_2) + ","
+                  + resultSet.getString(STATUS_STR_2);
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -305,6 +578,9 @@ Statement statement = connection.createStatement()) {
       for (String sql : dataSet1) {
         statement.execute(sql);
       }
+      for (String sql : dataSet2) {
+        statement.execute(sql);
+      }
 
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index d228192..2d9784a 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -865,4 +865,4 @@ var config = {
   }
   
   module.exports = config
-  
\ No newline at end of file
+  
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 1c4ae73..a893c1b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -533,6 +534,17 @@ public class BatchData implements Serializable {
     return booleanRet.get(idx / capacity)[idx % capacity];
   }
 
+  public TimeValuePair getLastPairBeforeOrEqualTimestamp(long queryTime) {
+    TimeValuePair resultPair = new TimeValuePair(Long.MIN_VALUE, null);
+    resetBatchData();
+    while (hasCurrent() && (currentTime() <= queryTime)) {
+      resultPair.setTimestamp(currentTime());
+      resultPair.setValue(currentTsPrimitiveType());
+      next();
+    }
+    return resultPair;
+  }
+
   public Object getValueInTimestamp(long time) {
     while (hasCurrent()) {
       if (currentTime() < time) {