You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/26 12:40:11 UTC
[incubator-iotdb] 01/01: add new Series reader
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 588c640ab7cfc220081b15a4fcc37f6a991b35e7
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Dec 26 20:39:48 2019 +0800
add new Series reader
---
.../db/engine/querycontext/QueryDataSource.java | 2 +-
.../groupby/GroupByWithoutValueFilterDataSet.java | 2 +-
.../db/query/executor/AggregateEngineExecutor.java | 2 +-
.../NewUnseqResourceMergeReader.java | 12 +-
.../NewSeriesReaderWithoutValueFilter.java | 278 +++++++++++++++++++++
.../SeriesReaderWithoutValueFilter.java | 2 +-
6 files changed, 289 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 82fb622..cc4611c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -67,7 +67,7 @@ public class QueryDataSource {
*
* @return an updated time filter concerning TTL
*/
- public Filter updateTimeFilter(Filter timeFilter) {
+ public Filter setTTL(Filter timeFilter) {
if (dataTTL != Long.MAX_VALUE) {
if (timeFilter != null) {
timeFilter = new AndFilter(timeFilter, TimeFilter.gtEq(System.currentTimeMillis() -
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index d44ce26..66372f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -80,7 +80,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
for (Path path : paths) {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path, context);
- timeFilter = queryDataSource.updateTimeFilter(timeFilter);
+ timeFilter = queryDataSource.setTTL(timeFilter);
// sequence reader for sealed tsfile, unsealed tsfile, memory
IAggregateReader seqResourceIterateReader = new SeqResourceIterateReader(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 2dc1b4b..705b638 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -101,7 +101,7 @@ public class AggregateEngineExecutor {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(selectedSeries.get(i), context);
// add additional time filter if TTL is set
- timeFilter = queryDataSource.updateTimeFilter(timeFilter);
+ timeFilter = queryDataSource.setTTL(timeFilter);
// sequence reader for sealed tsfile, unsealed tsfile, memory
IAggregateReader seqResourceIterateReader;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
index 61e8763..5620504 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
@@ -88,16 +88,18 @@ public class NewUnseqResourceMergeReader implements IBatchReader {
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);
- List<Modification> pathModifications = context
- .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
- if (!pathModifications.isEmpty()) {
- QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
- }
} else {
// metadata list of already flushed chunk groups
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}
+ // get modifications and apply to disk chunk metadatas
+ List<Modification> pathModifications = context
+ .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
+ }
+
if (!currentChunkMetaDataList.isEmpty()) {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource, tsFileResource.isClosed());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/NewSeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/NewSeriesReaderWithoutValueFilter.java
new file mode 100644
index 0000000..7c42e9d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/NewSeriesReaderWithoutValueFilter.java
@@ -0,0 +1,278 @@
+package org.apache.iotdb.db.query.reader.seriesRelated;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+
+public class NewSeriesReaderWithoutValueFilter {
+
+ private List<TsFileResource> seqTsFiles;
+ // disk chunk && mem chunk
+ private List<ChunkMetaData> seqChunkMetadatas;
+
+ private TreeSet<TsFileResource> unseqTsFiles;
+ // disk chunk && mem chunk
+ private TreeSet<ChunkMetaData> unseqChunkMetadatas;
+
+ private Filter timeFilter;
+ private TSDataType dataType;
+
+ private ChunkMetaData cachedChunkMetaData;
+ private boolean hasCachedNextChunk;
+
+ private AbstractChunkReader currentChunkReader;
+ private boolean isCurrentChunkReaderInit = false;
+
+
+ private PriorityMergeReader priorityMergeReader = new PriorityMergeReader();
+
+ private Path path;
+ private QueryContext context;
+
+ private PageHeader cachedPageHeader;
+ private boolean hasCachedNextPage;
+
+ public NewSeriesReaderWithoutValueFilter(Path seriesPath, TSDataType dataType, Filter timeFilter,
+ QueryContext context) throws StorageEngineException, IOException {
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(seriesPath, context);
+ timeFilter = queryDataSource.setTTL(timeFilter);
+
+ this.path = seriesPath;
+ this.seqTsFiles = queryDataSource.getSeqResources();
+ this.unseqTsFiles = queryDataSource.getUnseqResources();
+ this.timeFilter = timeFilter;
+ this.dataType = dataType;
+ this.context = context;
+
+ seqChunkMetadatas = loadChunkMetadatas(seqTsFiles.remove(0));
+ unseqChunkMetadatas = loadChunkMetadatas(unseqTsFiles.remove(0));
+
+ // 把所有 未封口的 顺序文件的 chunk metadata 都加进来
+
+ List<TsFileResource> unsealedResources = new ArrayList<>();
+ for (TsFileResource resource : seqTsFiles) {
+ seqChunkMetadatas.addAll(loadChunkMetadatas(resource));
+ unsealedResources.add(resource);
+ }
+ for (TsFileResource resource: unsealedResources) {
+ seqTsFiles.remove(resource);
+ }
+
+ unsealedResources.clear();
+
+ // 把所有 未封口的 乱序文件的 chunk metadata 都加进来
+
+ for (TsFileResource resource : unseqTsFiles) {
+ unseqChunkMetadatas.addAll(loadChunkMetadatas(resource));
+ unsealedResources.add(resource);
+ }
+ for (TsFileResource resource: unsealedResources) {
+ unseqTsFiles.remove(resource);
+ }
+
+ }
+
+
+
+ /**
+ * for raw data query
+ */
+ public boolean hasNextBatch() throws IOException {
+
+ if (hasNextChunk()) {
+ if (!isNextChunkOverlapped()) {
+ return currentChunkReader.hasNextBatch();
+ }
+ }
+
+ return false;
+ }
+
+ public BatchData nextBatch() throws IOException {
+ currentChunkReader.nextBatch();
+ return null;
+ }
+
+
+
+
+
+ /**
+ * for aggregation and group by
+ */
+
+ public boolean hasNextChunk() throws IOException {
+ if (hasCachedNextChunk) {
+ return true;
+ }
+
+ /**
+ * 只要 metadata 空了就从 resource 里补充一个
+ */
+ if (seqChunkMetadatas.isEmpty() && !seqTsFiles.isEmpty()) {
+ seqChunkMetadatas.addAll(loadChunkMetadatas(seqTsFiles.remove(0)));
+ }
+
+ if (unseqChunkMetadatas.isEmpty() && !unseqTsFiles.isEmpty()) {
+ unseqChunkMetadatas.addAll(loadChunkMetadatas(unseqTsFiles.remove(0)));
+ }
+
+ /**
+ * 拿顺序或乱序的第一个 ChunkMetadata,缓存起来
+ */
+ if (!seqChunkMetadatas.isEmpty() && unseqChunkMetadatas.isEmpty()) {
+ cachedChunkMetaData = seqChunkMetadatas.remove(0);
+ hasCachedNextChunk = true;
+ } else if (seqChunkMetadatas.isEmpty() && !unseqChunkMetadatas.isEmpty()) {
+ cachedChunkMetaData = unseqChunkMetadatas.remove(0);
+ hasCachedNextChunk = true;
+ } else if (!seqChunkMetadatas.isEmpty()) {
+ // seq 和 unseq 的 chunk metadata 都不为空
+ if (seqChunkMetadatas.get(0).getStartTime() <= unseqChunkMetadatas.get(0).getStartTime()) {
+ cachedChunkMetaData = seqChunkMetadatas.remove(0);
+ } else {
+ cachedChunkMetaData = unseqChunkMetadatas.remove(0);
+ }
+ hasCachedNextChunk = true;
+ } else {
+ // do not has chunk metadata in seq or unseq
+ hasCachedNextChunk = false;
+ }
+ return hasCachedNextChunk;
+ }
+
+ /**
+ * 加载一个 TsFileResource 的所有 ChunkMetadata, 如果是未封口的,把 memchunk 也加进来
+ */
+ private List<ChunkMetaData> loadChunkMetadatas(TsFileResource resource) throws IOException {
+ List<ChunkMetaData> currentChunkMetaDataList;
+ if (resource.isClosed()) {
+ currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(resource, path);
+ } else {
+ currentChunkMetaDataList = resource.getChunkMetaDataList();
+ }
+ // get modifications and apply to metadatas
+ List<Modification> pathModifications = context
+ .getPathModifications(resource.getModFile(), path.getFullPath());
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
+ }
+ return currentChunkMetaDataList;
+ }
+
+ public boolean isNextChunkOverlapped() throws IOException {
+ boolean isOverlapped = false;
+ if (!seqChunkMetadatas.isEmpty() && cachedChunkMetaData.getEndTime() >= seqChunkMetadatas.get(0)
+ .getStartTime()) {
+ isOverlapped = true;
+ }
+ if (!unseqChunkMetadatas.isEmpty() && cachedChunkMetaData.getEndTime() >= unseqChunkMetadatas
+ .get(0).getStartTime()) {
+ isOverlapped = true;
+ }
+
+ /**
+ * 初始化下一个 chunk reader
+ */
+ if (isOverlapped && !isCurrentChunkReaderInit) {
+ initCurrentChunkReader();
+ }
+
+ return isOverlapped;
+ }
+
+ public ChunkMetaData nextChunkMetadata() throws IOException {
+ if (hasCachedNextChunk || hasNextChunk()) {
+ hasCachedNextChunk = false;
+ return cachedChunkMetaData;
+ } else {
+ throw new IOException("no more chunk metadata");
+ }
+ }
+
+ private void initCurrentChunkReader() throws IOException {
+ IChunkLoader chunkLoader = cachedChunkMetaData.getChunkLoader();
+ Chunk chunk = chunkLoader.getChunk(cachedChunkMetaData);
+ currentChunkReader = new ChunkReader(chunk, timeFilter);
+ isCurrentChunkReaderInit = true;
+ }
+
+
+
+ public boolean hasNextPage() throws IOException {
+ if (hasCachedNextPage) {
+ return true;
+ }
+ if (!isCurrentChunkReaderInit) {
+ initCurrentChunkReader();
+ }
+ if (isCurrentChunkReaderInit && currentChunkReader.hasNextBatch()) {
+ cachedPageHeader = currentChunkReader.nextPageHeader();
+ hasCachedNextPage = true;
+ }
+ return hasCachedNextPage;
+ }
+
+ public boolean isNextPageOverlapped() {
+
+ PageHeader pageHeader = cachedPageHeader;
+
+
+ return false;
+ }
+
+
+ public BatchData nextOverlappedPage() throws IOException {
+ BatchData batchData = new BatchData(dataType);
+ long latestDirectlyOverlappedPageEndTime = 0;
+ for (priorityMergeReader.hasNext()) {
+ TimeValuePair timeValuePair = priorityMergeReader.current();
+ if (timeValuePair.getTimestamp() > latestDirectlyOverlappedPageEndTime) {
+ break;
+ }
+ batchData.putTime(timeValuePair.getTimestamp());
+ batchData.putAnObject(timeValuePair.getValue().getValue());
+ }
+
+ return batchData;
+
+ }
+
+
+
+ public PageHeader nextPageHeader() throws IOException {
+ if (hasCachedNextPage || hasNextPage()) {
+ hasCachedNextPage = false;
+ return cachedPageHeader;
+ } else {
+ throw new IOException("no next page header");
+ }
+ }
+
+ public void close() throws IOException {
+
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
index ed71dc8..b6159a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
@@ -72,7 +72,7 @@ public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReade
QueryContext context, boolean pushdownUnseq) throws StorageEngineException, IOException {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context);
- timeFilter = queryDataSource.updateTimeFilter(timeFilter);
+ timeFilter = queryDataSource.setTTL(timeFilter);
// reader for sequence resources
this.seqResourceIterateReader = new SeqResourceIterateReader(