You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/11 02:46:15 UTC
[incubator-iotdb] branch master updated: [IOTDB-540] Accelerate
Previous Fill (#931)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new faf4924 [IOTDB-540] Accelerate Previous Fill (#931)
faf4924 is described below
commit faf4924c328d94f49f58c5caedd0e25e4c073024
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Sat Apr 11 10:46:03 2020 +0800
[IOTDB-540] Accelerate Previous Fill (#931)
* Reimplement PreviousFill
---
.../resources/conf/iotdb-engine.properties | 4 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
.../db/engine/storagegroup/TsFileResource.java | 2 +-
.../iotdb/db/query/executor/FillQueryExecutor.java | 12 +-
.../java/org/apache/iotdb/db/query/fill/IFill.java | 19 +-
.../org/apache/iotdb/db/query/fill/LinearFill.java | 27 +-
.../apache/iotdb/db/query/fill/PreviousFill.java | 222 +++++++++++++--
.../db/query/reader/chunk/MemChunkReader.java | 1 -
.../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 2 +-
.../apache/iotdb/db/integration/IoTDBFillIT.java | 316 +++++++++++++++++++--
site/src/main/.vuepress/config.js | 2 +-
.../apache/iotdb/tsfile/read/common/BatchData.java | 12 +
12 files changed, 548 insertions(+), 75 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e0a13d7..b388823 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -238,8 +238,8 @@ upgrade_thread_num=1
### Query Configurations
####################
-# the default time period that used in fill query, 10min by default
-default_fill_interval=600000
+# the default time period that used in fill query, -1 by default means infinite past time
+default_fill_interval=-1
####################
### Merge Configurations
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fa3013a..a1dee4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -510,9 +510,9 @@ public class IoTDBConfig {
private int memtableNumInEachStorageGroup = 10;
/**
- * the default fill interval in LinearFill and PreviousFill, 10min
+ * the default fill interval in LinearFill and PreviousFill, -1 means infinite past time
*/
- private int defaultFillInterval = 600000;
+ private int defaultFillInterval = -1;
/**
* default TTL for storage groups that are not set TTL by statements, in ms
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ce20b85..43eac5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -280,7 +280,7 @@ public class TsFileResource {
}
public List<ChunkMetadata> getChunkMetadataList() {
- return chunkMetadataList;
+ return new ArrayList<>(chunkMetadataList);
}
public List<ReadOnlyMemChunk> getReadOnlyMemChunk() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index 48e2a86..6b823c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -77,8 +77,6 @@ public class FillQueryExecutor {
case INT64:
case FLOAT:
case DOUBLE:
- fill = new LinearFill(dataType, queryTime, defaultFillInterval, defaultFillInterval);
- break;
case BOOLEAN:
case TEXT:
fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
@@ -89,7 +87,8 @@ public class FillQueryExecutor {
} else {
fill = typeIFillMap.get(dataType).copy();
}
- configureFill(fill, dataType, path, fillQueryPlan.getAllMeasurementsInDevice(path.getDevice()), context, queryTime);
+ fill.configureFill(path, dataType, queryTime,
+ fillQueryPlan.getAllMeasurementsInDevice(path.getDevice()), context);
TimeValuePair timeValuePair = fill.getFillResult();
if (timeValuePair == null || timeValuePair.getValue() == null) {
@@ -103,11 +102,4 @@ public class FillQueryExecutor {
dataSet.setRecord(record);
return dataSet;
}
-
- protected void configureFill(IFill fill, TSDataType dataType, Path path, Set<String> allSensors, QueryContext context,
- long queryTime) throws StorageEngineException, QueryProcessException {
- fill.setDataType(dataType);
- fill.setQueryTime(queryTime);
- fill.constructReaders(path, allSensors, context);
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index 1f3f686..675fed8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -23,13 +23,10 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import java.io.IOException;
import java.util.Set;
@@ -39,8 +36,6 @@ public abstract class IFill {
long queryTime;
TSDataType dataType;
- IBatchReader allDataReader;
-
public IFill(TSDataType dataType, long queryTime) {
this.dataType = dataType;
this.queryTime = queryTime;
@@ -51,17 +46,9 @@ public abstract class IFill {
public abstract IFill copy();
- public void constructReaders(Path path, Set<String> allSensors, QueryContext context)
- throws StorageEngineException, QueryProcessException {
- Filter timeFilter = constructFilter();
- allDataReader = new SeriesRawDataBatchReader(path, allSensors, dataType, context,
- QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
- timeFilter, null, null);
- }
-
- public void setAllDataReader(IBatchReader allDataReader) {
- this.allDataReader = allDataReader;
- }
+ public abstract void configureFill(Path path, TSDataType dataType, long queryTime,
+ Set<String> sensors, QueryContext context)
+ throws StorageEngineException, QueryProcessException;
public Filter getFilter() {
return constructFilter();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
index 0e4c20a..dd74809 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
@@ -19,13 +19,21 @@
package org.apache.iotdb.db.query.fill;
+import java.util.Set;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
@@ -34,6 +42,7 @@ public class LinearFill extends IFill {
private long beforeRange;
private long afterRange;
+ private IBatchReader dataReader;
private BatchData batchData;
public LinearFill(long beforeRange, long afterRange) {
@@ -84,12 +93,24 @@ public class LinearFill extends IFill {
}
@Override
+ public void configureFill(Path path, TSDataType dataType, long queryTime,
+ Set<String> sensors, QueryContext context)
+ throws StorageEngineException, QueryProcessException {
+ this.dataType = dataType;
+ this.queryTime = queryTime;
+ Filter timeFilter = constructFilter();
+ dataReader = new SeriesRawDataBatchReader(path, sensors, dataType, context,
+ QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
+ timeFilter, null, null);
+ }
+
+ @Override
public TimeValuePair getFillResult() throws IOException, UnSupportedFillTypeException {
TimeValuePair beforePair = null;
TimeValuePair afterPair = null;
- while (batchData.hasCurrent() || allDataReader.hasNextBatch()) {
- if (!batchData.hasCurrent() && allDataReader.hasNextBatch()) {
- batchData = allDataReader.nextBatch();
+ while (batchData.hasCurrent() || dataReader.hasNextBatch()) {
+ if (!batchData.hasCurrent() && dataReader.hasNextBatch()) {
+ batchData = dataReader.nextBatch();
}
afterPair = new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
batchData.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
index 3d92fb6..19b6584 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
@@ -18,24 +18,49 @@
*/
package org.apache.iotdb.db.query.fill;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import java.io.IOException;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
public class PreviousFill extends IFill {
+ private Path seriesPath;
+ private QueryContext context;
private long beforeRange;
- private BatchData batchData;
+ private Set<String> allSensors;
+ private Filter timeFilter;
+
+ private QueryDataSource dataSource;
+
+ private List<TimeseriesMetadata> unseqTimeseriesMetadataList;
public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) {
super(dataType, queryTime);
this.beforeRange = beforeRange;
- batchData = new BatchData();
+ this.unseqTimeseriesMetadataList = new ArrayList<>();
}
public PreviousFill(long beforeRange) {
@@ -44,7 +69,7 @@ public class PreviousFill extends IFill {
@Override
public IFill copy() {
- return new PreviousFill(dataType, queryTime, beforeRange);
+ return new PreviousFill(dataType, queryTime, beforeRange);
}
@Override
@@ -60,27 +85,188 @@ public class PreviousFill extends IFill {
}
@Override
+ public void configureFill(Path path, TSDataType dataType, long queryTime,
+ Set<String> sensors, QueryContext context)
+ throws StorageEngineException, QueryProcessException {
+ this.seriesPath = path;
+ this.dataType = dataType;
+ this.context = context;
+ this.queryTime = queryTime;
+ this.allSensors = sensors;
+ this.timeFilter = constructFilter();
+ this.dataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+ // update filter by TTL
+ timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
+ }
+
+ @Override
public TimeValuePair getFillResult() throws IOException {
- TimeValuePair beforePair = null;
- TimeValuePair cachedPair;
- while (batchData.hasCurrent() || allDataReader.hasNextBatch()) {
- if (!batchData.hasCurrent() && allDataReader.hasNextBatch()) {
- batchData = allDataReader.nextBatch();
+ TimeValuePair lastPointResult = retrieveValidLastPointFromSeqFiles();
+ UnpackOverlappedUnseqFiles(lastPointResult.getTimestamp());
+
+ long lastVersion = 0;
+ PriorityQueue<ChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime();
+ while (!sortedChunkMetatdataList.isEmpty()
+ && lastPointResult.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) {
+ ChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll();
+ TimeValuePair lastChunkPoint = getChunkLastPoint(chunkMetadata);
+ if (shouldUpdate(lastPointResult.getTimestamp(), lastVersion,
+ lastChunkPoint.getTimestamp(), chunkMetadata.getVersion())) {
+ lastPointResult = lastChunkPoint;
+ lastVersion = chunkMetadata.getVersion();
}
- cachedPair = new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
- batchData.next();
- if (cachedPair.getTimestamp() <= queryTime) {
- beforePair = cachedPair;
- } else {
+ }
+ return lastPointResult;
+ }
+
+ /** Pick up and cache the last sequence TimeseriesMetadata that satisfies timeFilter */
+ private TimeValuePair retrieveValidLastPointFromSeqFiles() throws IOException {
+ List<TsFileResource> seqFileResource = dataSource.getSeqResources();
+ TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
+ for (int index = seqFileResource.size() - 1; index >= 0; index--) {
+ TsFileResource resource = seqFileResource.get(index);
+ TimeseriesMetadata timeseriesMetadata =
+ FileLoaderUtils.loadTimeSeriesMetadata(
+ resource, seriesPath, context, timeFilter, allSensors);
+ if (timeseriesMetadata != null) {
+ if (timeseriesMetadata.getStatistics().canUseStatistics()
+ && endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
+ return constructLastPair(
+ timeseriesMetadata.getStatistics().getEndTime(),
+ timeseriesMetadata.getStatistics().getLastValue(),
+ dataType);
+ } else {
+ List<ChunkMetadata> seqChunkMetadataList =
+ FileLoaderUtils.loadChunkMetadataList(timeseriesMetadata);
+
+ for (int i = seqChunkMetadataList.size() - 1; i >= 0; i--) {
+ lastPoint = getChunkLastPoint(seqChunkMetadataList.get(i));
+ // last point of this sequence chunk is valid, quit the loop
+ if (lastPoint.getValue() != null) {
+ return lastPoint;
+ }
+ }
+ }
+ }
+ }
+
+ return lastPoint;
+ }
+
+ /**
+ * find the last TimeseriesMetadata in unseq files and unpack all overlapped unseq files
+ */
+ private void UnpackOverlappedUnseqFiles(long lBoundTime) throws IOException {
+ PriorityQueue<TsFileResource> unseqFileResource =
+ sortUnSeqFileResourcesInDecendingOrder(dataSource.getUnseqResources());
+
+ while (!unseqFileResource.isEmpty()) {
+ // The very end time of unseq files is smaller than lBoundTime,
+ // then skip all the rest unseq files
+ if (unseqFileResource.peek().getEndTimeMap().get(seriesPath.getDevice()) < lBoundTime) {
+ return;
+ }
+ TimeseriesMetadata timeseriesMetadata =
+ FileLoaderUtils.loadTimeSeriesMetadata(
+ unseqFileResource.poll(), seriesPath, context, timeFilter, allSensors);
+ if (timeseriesMetadata != null && timeseriesMetadata.getStatistics().canUseStatistics()
+ && lBoundTime <= timeseriesMetadata.getStatistics().getEndTime()) {
+ // The last timeseriesMetadata will be used as a pivot to filter the rest unseq files.
+ // Update lBoundTime with the last timeseriesMetadata's start time
+ lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getStartTime());
+ unseqTimeseriesMetadataList.add(timeseriesMetadata);
break;
}
}
- if (beforePair != null) {
- beforePair.setTimestamp(queryTime);
- } else {
- beforePair = new TimeValuePair(queryTime, null);
+ // unpack all overlapped unseq files and fill unseqTimeseriesMetadata list
+ while (!unseqFileResource.isEmpty()
+ && (lBoundTime <= unseqFileResource.peek().getEndTimeMap().get(seriesPath.getDevice()))) {
+ TimeseriesMetadata timeseriesMetadata =
+ FileLoaderUtils.loadTimeSeriesMetadata(
+ unseqFileResource.poll(), seriesPath, context, timeFilter, allSensors);
+ unseqTimeseriesMetadataList.add(timeseriesMetadata);
+ // update lBoundTime if current unseq timeseriesMetadata's last point is a valid result
+ if (timeseriesMetadata.getStatistics().canUseStatistics()
+ && endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
+ lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getEndTime());
+ }
+ }
+ }
+
+ private TimeValuePair getChunkLastPoint(ChunkMetadata chunkMetaData) throws IOException {
+ TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
+ if (chunkMetaData == null) {
+ return lastPoint;
+ }
+ Statistics chunkStatistics = chunkMetaData.getStatistics();
+
+ if (chunkStatistics.canUseStatistics() && endtimeContainedByTimeFilter(chunkStatistics)) {
+ return constructLastPair(
+ chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), dataType);
+ }
+ List<IPageReader> pageReaders = FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter);
+ for (int i = pageReaders.size() - 1; i >= 0; i--) {
+ IPageReader pageReader = pageReaders.get(i);
+ Statistics pageStatistics = pageReader.getStatistics();
+ if (pageStatistics.canUseStatistics() && endtimeContainedByTimeFilter(pageStatistics)) {
+ lastPoint = constructLastPair(
+ pageStatistics.getEndTime(), pageStatistics.getLastValue(), dataType);
+ } else {
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ lastPoint = batchData.getLastPairBeforeOrEqualTimestamp(queryTime);
+ }
+ if (lastPoint.getValue() != null) {
+ return lastPoint;
+ }
+ }
+ return lastPoint;
+ }
+
+ private boolean shouldUpdate(long time, long version, long newTime, long newVersion) {
+ return time < newTime || (time == newTime && version < newVersion);
+ }
+
+ private PriorityQueue<TsFileResource> sortUnSeqFileResourcesInDecendingOrder(
+ List<TsFileResource> tsFileResources) {
+ PriorityQueue<TsFileResource> unseqTsFilesSet =
+ new PriorityQueue<>(
+ (o1, o2) -> {
+ Map<String, Long> startTimeMap = o1.getEndTimeMap();
+ Long minTimeOfO1 = startTimeMap.get(seriesPath.getDevice());
+ Map<String, Long> startTimeMap2 = o2.getEndTimeMap();
+ Long minTimeOfO2 = startTimeMap2.get(seriesPath.getDevice());
+
+ return Long.compare(minTimeOfO2, minTimeOfO1);
+ });
+ unseqTsFilesSet.addAll(tsFileResources);
+ return unseqTsFilesSet;
+ }
+
+ private PriorityQueue<ChunkMetadata> sortUnseqChunkMetadatasByEndtime() throws IOException {
+ PriorityQueue<ChunkMetadata> chunkMetadataList =
+ new PriorityQueue<>(
+ (o1, o2) -> {
+ long endTime1 = o1.getEndTime();
+ long endTime2 = o2.getEndTime();
+ if (endTime1 < endTime2) {
+ return 1;
+ } else if (endTime1 > endTime2) {
+ return -1;
+ }
+ return Long.compare(o2.getVersion(), o1.getVersion());
+ });
+ for (TimeseriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) {
+ chunkMetadataList.addAll(timeseriesMetadata.loadChunkMetadataList());
}
- return beforePair;
+ return chunkMetadataList;
+ }
+
+ private boolean endtimeContainedByTimeFilter(Statistics statistics) {
+ return timeFilter.containStartEndTime(statistics.getEndTime(), statistics.getEndTime());
+ }
+
+ private TimeValuePair constructLastPair(long timestamp, Object value, TSDataType dataType) {
+ return new TimeValuePair(timestamp, TsPrimitiveType.getByType(dataType, value));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
index d831063..4884472 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
@@ -121,5 +121,4 @@ public class MemChunkReader implements IChunkReader, IPointReader {
return Collections.singletonList(
new MemPageReader(nextPageData(), readOnlyMemChunk.getChunkMetaData().getStatistics()));
}
-
}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 86aadaf..f6b1e37 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -513,7 +513,7 @@ public class IoTDBAlignByDeviceIT {
@Test
public void fillTest() throws ClassNotFoundException {
String[] retArray = new String[]{
- "3,root.vehicle.d0,10000,40208,3.33,null,null,",
+ "3,root.vehicle.d0,10000,40000,3.33,null,null,",
"3,root.vehicle.d1,999,null,null,null,null,",
};
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
index cb7bf18..4bcf9dd 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
@@ -95,9 +95,38 @@ public class IoTDBFillIT {
+ "values(620, 500.5, false, 550)",
};
+ private static String[] dataSet2 = new String[]{
+ "SET STORAGE GROUP TO root.ln.wf01.wt02",
+ "CREATE TIMESERIES root.ln.wf01.wt02.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt02.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(100, 100.1, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(150, 200.2, true)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(300, 500.5, false)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(600, 31.1, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(750, 55.2, true)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(900, 1020.5, false)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(1100, 98.41, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(1250, 220.2, true)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
+ + "values(1400, 31, false)",
+ "flush",
+ };
+
private static final String TIMESTAMP_STR = "Time";
- private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature";
- private static final String STATUS_STR = "root.ln.wf01.wt01.status";
+ private static final String TEMPERATURE_STR_1 = "root.ln.wf01.wt01.temperature";
+ private static final String STATUS_STR_1 = "root.ln.wf01.wt01.status";
+ private static final String TEMPERATURE_STR_2 = "root.ln.wf01.wt02.temperature";
+ private static final String STATUS_STR_2 = "root.ln.wf01.wt02.status";
private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
@Before
@@ -135,8 +164,8 @@ public class IoTDBFillIT {
try (ResultSet resultSet = statement.getResultSet()) {
cnt = 0;
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -149,8 +178,8 @@ public class IoTDBFillIT {
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -163,8 +192,8 @@ public class IoTDBFillIT {
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -177,8 +206,8 @@ public class IoTDBFillIT {
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -209,8 +238,8 @@ public class IoTDBFillIT {
try (ResultSet resultSet = statement.getResultSet()) {
cnt = 0;
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -223,8 +252,8 @@ public class IoTDBFillIT {
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -237,8 +266,8 @@ public class IoTDBFillIT {
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -269,8 +298,8 @@ Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.getResultSet()) {
cnt = 0;
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -283,8 +312,252 @@ Statement statement = connection.createStatement()) {
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR)
- + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR);
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_1)
+ + "," + resultSet.getString(STATUS_STR_1) + "," + resultSet.getString(HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void PreviousFillWithOnlySeqFileTest() throws SQLException {
+ String[] retArray = new String[]{
+ "1050,1020.5,false",
+ "800,55.2,true"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ ResultSet resultSet = statement.executeQuery("select temperature,status "
+ + "from root.ln.wf01.wt02 where time = 1050 Fill(double[previous])");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+ + "," + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+
+ resultSet = statement.executeQuery("select temperature,status "
+ + "from root.ln.wf01.wt02 where time = 800 Fill(double[previous])");
+
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+ + "," + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void PreviousFillWithOnlyUnseqFileOverlappedTest() throws SQLException {
+ String[] retArray = new String[]{
+ "58,82.1,true",
+ "40,121.22,true",
+ "80,32.2,false"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(50, 82.1, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(35, 121.22, true)");
+ statement.execute("flush");
+
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(25, 102.15, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(78, 32.2, false)");
+ statement.execute("flush");
+
+ int cnt = 0;
+ ResultSet resultSet = statement.executeQuery("select temperature,status "
+ + "from root.ln.wf01.wt02 where time = 58 Fill(double[previous])");
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+ + "," + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+
+ resultSet = statement.executeQuery("select temperature,status "
+ + "from root.ln.wf01.wt02 where time = 40 Fill(double[previous])");
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+ + "," + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+
+ resultSet = statement.executeQuery("select temperature,status "
+ + "from root.ln.wf01.wt02 where time = 80 Fill(double[previous])");
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+ + "," + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void PreviousFillMultiUnseqFileWithSameLastTest() throws SQLException {
+ String[] retArray = new String[]{
+ "59,82.1,true",
+ "52,32.2,false",
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(50, 82.1, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(35, 121.22, true)");
+ statement.execute("flush");
+
+ int cnt = 0;
+ ResultSet resultSet = statement.executeQuery("select temperature,status "
+ + "from root.ln.wf01.wt02 where time = 59 Fill(double[previous])");
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+ + "," + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(25, 102.15, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(50, 32.2, false)");
+ statement.execute("flush");
+
+ resultSet = statement.executeQuery("select temperature,status "
+ + "from root.ln.wf01.wt02 where time = 52 Fill(double[previous])");
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR_2)
+ + "," + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void PreviousFillSeqFileFilterOverlappedFilesTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "886,55.2,true",
+ "730,121.22,true",
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ int cnt = 0;
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(950, 82.1, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(650, 121.22, true)");
+ statement.execute("flush");
+
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(740, 33.1, false)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(420, 125.1, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(890, 22.82, false)");
+ statement.execute("flush");
+
+ {
+ ResultSet resultSet = statement.executeQuery(
+ "select temperature,status from root.ln.wf01.wt02 where time = 886 "
+ + "Fill(double[previous])");
+
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ + resultSet.getString(TEMPERATURE_STR_2) + ","
+ + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ }
+
+ {
+ ResultSet resultSet = statement.executeQuery(
+ "select temperature,status from root.ln.wf01.wt02 where time = 730 "
+ + "Fill(double[previous])");
+
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + ","
+ + resultSet.getString(TEMPERATURE_STR_2) + ","
+ + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void PreviousFillUnseqFileFilterOverlappedFilesTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "990,121.22,true",
+ "925,33.1,false",
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ int cnt = 0;
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(1030, 82.1, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(940, 121.22, true)");
+ statement.execute("flush");
+
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(740, 62.1, false)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,status) values(980, true)");
+ statement.execute("flush");
+
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(910, 33.1, false)");
+ statement.execute("INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(620, 125.1, true)");
+ statement.execute("flush");
+
+ {
+ ResultSet resultSet = statement.executeQuery(
+ "select temperature,status from root.ln.wf01.wt02 where time = 990 "
+ + "Fill(double[previous])");
+
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + ","
+ + resultSet.getString(TEMPERATURE_STR_2) + ","
+ + resultSet.getString(STATUS_STR_2);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ }
+
+ {
+ ResultSet resultSet = statement.executeQuery(
+ "select temperature,status from root.ln.wf01.wt02 where time = 925 "
+ + "Fill(double[previous])");
+
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + ","
+ + resultSet.getString(TEMPERATURE_STR_2) + ","
+ + resultSet.getString(STATUS_STR_2);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -305,6 +578,9 @@ Statement statement = connection.createStatement()) {
for (String sql : dataSet1) {
statement.execute(sql);
}
+ for (String sql : dataSet2) {
+ statement.execute(sql);
+ }
} catch (Exception e) {
e.printStackTrace();
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index d228192..2d9784a 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -865,4 +865,4 @@ var config = {
}
module.exports = config
-
\ No newline at end of file
+
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 1c4ae73..a893c1b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -533,6 +534,17 @@ public class BatchData implements Serializable {
return booleanRet.get(idx / capacity)[idx % capacity];
}
+ public TimeValuePair getLastPairBeforeOrEqualTimestamp(long queryTime) {
+ TimeValuePair resultPair = new TimeValuePair(Long.MIN_VALUE, null);
+ resetBatchData();
+ while (hasCurrent() && (currentTime() <= queryTime)) {
+ resultPair.setTimestamp(currentTime());
+ resultPair.setValue(currentTsPrimitiveType());
+ next();
+ }
+ return resultPair;
+ }
+
public Object getValueInTimestamp(long time) {
while (hasCurrent()) {
if (currentTime() < time) {