You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/23 02:19:25 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: update
(#200)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 1caeab6 update (#200)
1caeab6 is described below
commit 1caeab6e7f908eaea8e7e5bf8f4f11886d9118d5
Author: RUI, LEI <33...@users.noreply.github.com>
AuthorDate: Sun Jun 23 10:19:21 2019 +0800
update (#200)
---
.../groupby/GroupByWithOnlyTimeFilterDataSet.java | 11 +-
.../groupby/GroupByWithValueFilterDataSet.java | 2 +-
.../db/query/executor/AggregateEngineExecutor.java | 11 +-
.../executor/EngineExecutorWithTimeGenerator.java | 9 +-
.../db/query/factory/ISeriesReaderFactory.java | 6 +-
.../db/query/factory/SeriesReaderFactory.java | 4 +-
.../db/query/factory/SeriesReaderFactoryImpl.java | 166 ++++++++++++++++++---
.../query/reader/unsequence/EngineChunkReader.java | 10 +-
8 files changed, 166 insertions(+), 53 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
index 873682b..fb19694 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.query.dataset.groupby;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -44,6 +41,10 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
private List<IPointReader> unSequenceReaderList;
@@ -91,8 +92,8 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
// unseq reader for all chunk groups in unSeqFile, memory
IPointReader unSeqMergeReader = SeriesReaderFactoryImpl.getInstance()
- .createUnSeqReader(queryDataSource.getSeriesPath(), queryDataSource.getUnseqResources(),
- timeFilter);
+ .createUnSeqReader(queryDataSource.getSeriesPath(), queryDataSource.getUnseqResources(), context,
+ timeFilter);
sequenceReaderList.add(sequenceReader);
unSequenceReaderList.add(unSeqMergeReader);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index fc2a1b4..cbaad17 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -70,7 +70,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
* init reader and aggregate function.
*/
public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
- throws FileNodeManagerException, PathErrorException, ProcessorException {
+ throws FileNodeManagerException, PathErrorException, ProcessorException, IOException {
initAggreFuction(aggres);
QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(), expression);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index a608127..c6fb944 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.query.executor;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -52,6 +49,10 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
public class AggregateEngineExecutor {
private List<Path> selectedSeries;
@@ -114,8 +115,8 @@ public class AggregateEngineExecutor {
// unseq reader for all chunk groups in unSeqFile, memory
IPointReader unSeqMergeReader = SeriesReaderFactoryImpl.getInstance()
- .createUnSeqReader(queryDataSource.getSeriesPath(), queryDataSource.getUnseqResources(),
- timeFilter);
+ .createUnSeqReader(queryDataSource.getSeriesPath(), queryDataSource.getUnseqResources(), context,
+ timeFilter);
readersOfSequenceData.add(sequenceReader);
readersOfUnSequenceData.add(unSeqMergeReader);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index 3deb7c1..c7cdde6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.query.executor;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
@@ -36,6 +33,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* IoTDB query executor with value filter.
*/
@@ -53,7 +54,7 @@ public class EngineExecutorWithTimeGenerator {
* @return QueryDataSet object
* @throws FileNodeManagerException FileNodeManagerException
*/
- public QueryDataSet execute(QueryContext context) throws FileNodeManagerException {
+ public QueryDataSet execute(QueryContext context) throws FileNodeManagerException, IOException {
QueryResourceManager.getInstance()
.beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
index ebbf7ff..416bae6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
@@ -40,8 +40,8 @@ public interface ISeriesReaderFactory {
* This method is used to read all unsequence data for IoTDB request, such as query, aggregation
* and groupby request.
*/
- IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
- Filter filter) throws IOException;
+ IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources, QueryContext context,
+ Filter filter) throws IOException;
/**
@@ -52,7 +52,7 @@ public interface ISeriesReaderFactory {
* @return the list of EngineReaderByTimeStamp
*/
List<EngineReaderByTimeStamp> createByTimestampReadersOfSelectedPaths(List<Path> paths,
- QueryContext context) throws FileNodeManagerException;
+ QueryContext context) throws FileNodeManagerException, IOException;
/**
* construct IPointReader, include sequential data and unsequential data.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 2fa34f4..60f02a0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -117,9 +117,7 @@ public class SeriesReaderFactory {
ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
: new ChunkReaderWithoutFilter(chunk);
- unSeqMergeReader
- .addReaderWithPriority(new EngineChunkReader(chunkReader, unClosedTsFileReader),
- priorityValue);
+ unSeqMergeReader.addReaderWithPriority(new EngineChunkReader(chunkReader), priorityValue);
priorityValue++;
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
index 40969f7..a5a1651 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
@@ -18,26 +18,46 @@
*/
package org.apache.iotdb.db.query.factory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
+import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestampV2;
import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderV2;
+import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
+import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReaderByTimestamp;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
private static final Logger logger = LoggerFactory.getLogger(SeriesReaderFactory.class);
@@ -50,14 +70,119 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
}
@Override
- public IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
- Filter filter) throws IOException {
- return null;
+ public IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources, QueryContext context,
+ Filter filter) throws IOException {
+ PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
+
+ int priorityValue = 1;
+
+ for (TsFileResourceV2 tsFileResourceV2 : unSeqResources) {
+
+ // store only one opened file stream into manager, to avoid too many opened files
+ TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
+ .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
+
+ // get modified chunk metadatas
+ List<ChunkMetaData> metaDataList;
+ if (tsFileResourceV2.isClosed()) {
+ MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
+ metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
+ // mod
+ List<Modification> pathModifications = context.getPathModifications(tsFileResourceV2.getModFile(),
+ seriesPath.getFullPath());
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
+ }
+ } else {
+ metaDataList = tsFileResourceV2.getChunkMetaDatas();
+ }
+
+ // add readers for chunks
+ // TODO 下面这段对chunkmetadata过滤考虑复用
+ ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileReader);
+ for (ChunkMetaData chunkMetaData : metaDataList) {
+
+ DigestForFilter digest = new DigestForFilter(chunkMetaData.getStartTime(),
+ chunkMetaData.getEndTime(),
+ chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
+ chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
+ chunkMetaData.getTsDataType());
+
+ if (filter != null && !filter.satisfy(digest)) {
+ continue;
+ }
+
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
+ : new ChunkReaderWithoutFilter(chunk);
+
+ unSeqMergeReader.addReaderWithPriority(new EngineChunkReader(chunkReader), priorityValue);
+
+ priorityValue++;
+ }
+
+ // add reader for MemTable
+ if (!tsFileResourceV2.isClosed()) {
+ unSeqMergeReader.addReaderWithPriority(new MemChunkReader(tsFileResourceV2.getReadOnlyMemChunk(), filter), priorityValue++);
+ }
+ }
+
+ // TODO add external sort when needed
+ return unSeqMergeReader;
+ }
+
+ private PriorityMergeReaderByTimestamp createUnSeqByTimestampReader(Path seriesPath,
+ List<TsFileResourceV2> unSeqResources, QueryContext context) throws IOException {
+ PriorityMergeReaderByTimestamp unSeqMergeReader = new PriorityMergeReaderByTimestamp();
+
+ int priorityValue = 1;
+
+ for (TsFileResourceV2 tsFileResourceV2 : unSeqResources) {
+
+ // store only one opened file stream into manager, to avoid too many opened files
+ TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
+ .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
+
+ List<ChunkMetaData> metaDataList;
+ if (tsFileResourceV2.isClosed()) {
+ MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
+ metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
+ // mod
+ List<Modification> pathModifications = context.getPathModifications(tsFileResourceV2.getModFile(),
+ seriesPath.getFullPath());
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
+ }
+ } else {
+ metaDataList = tsFileResourceV2.getChunkMetaDatas();
+ }
+
+ // add reader for chunk
+ ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileReader);
+ for (ChunkMetaData chunkMetaData : metaDataList) {
+
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk);
+
+ unSeqMergeReader.addReaderWithPriority(new EngineChunkReaderByTimestamp(chunkReader),
+ priorityValue);
+
+ priorityValue++;
+ }
+
+ // add reader for MemTable
+ if (!tsFileResourceV2.isClosed()) {
+ unSeqMergeReader.addReaderWithPriority(new MemChunkReaderByTimestamp(tsFileResourceV2.getReadOnlyMemChunk()), priorityValue++);
+ }
+ }
+
+ // TODO add external sort when needed
+ return unSeqMergeReader;
}
@Override
public List<EngineReaderByTimeStamp> createByTimestampReadersOfSelectedPaths(List<Path> paths,
- QueryContext context) throws FileNodeManagerException {
+ QueryContext context) throws FileNodeManagerException, IOException {
List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
for (Path path : paths) {
@@ -65,8 +190,8 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
QueryDataSourceV2 queryDataSource = null;
try {
queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSourceV2(path,
- context);
+ .getQueryDataSourceV2(path,
+ context);
} catch (ProcessorException e) {
throw new FileNodeManagerException(e);
}
@@ -75,13 +200,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
// reader for sequence data
SequenceDataReaderByTimestampV2 tsFilesReader = new SequenceDataReaderByTimestampV2(path,
- queryDataSource.getSeqResources(), context);
+ queryDataSource.getSeqResources(), context);
mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
// reader for unSequence data
//TODO add create unseq reader
- PriorityMergeReaderByTimestamp unSeqMergeReader = createUnSeqByTimestampReader(
- queryDataSource.getUnseqResources());
+ PriorityMergeReaderByTimestamp unSeqMergeReader = createUnSeqByTimestampReader(path,
+ queryDataSource.getUnseqResources(), context);
mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2);
readersOfSelectedSeries.add(mergeReaderByTimestamp);
@@ -90,18 +215,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
return readersOfSelectedSeries;
}
- private PriorityMergeReaderByTimestamp createUnSeqByTimestampReader(
- List<TsFileResourceV2> unseqResources) {
- return null;
- }
-
@Override
public IPointReader createAllDataReader(Path path, Filter timeFilter, QueryContext context)
- throws FileNodeManagerException, IOException {
+ throws FileNodeManagerException, IOException {
QueryDataSourceV2 queryDataSource = null;
try {
queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSourceV2(path, context);
+ .getQueryDataSourceV2(path, context);
} catch (ProcessorException e) {
throw new FileNodeManagerException(e);
}
@@ -109,13 +229,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
// sequence reader for one sealed tsfile
SequenceDataReaderV2 tsFilesReader;
- tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(),
- queryDataSource.getSeqResources(),
- timeFilter, context);
+ tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(),
+ queryDataSource.getSeqResources(),
+ timeFilter, context);
// unseq reader for all chunk groups in unSeqFile
IPointReader unSeqMergeReader = null;
- unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), timeFilter);
+ unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), context, timeFilter);
if (!tsFilesReader.hasNext()) {
//only have unsequence data.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java
index 2465cbd..04a1fce 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java
@@ -32,15 +32,8 @@ public class EngineChunkReader implements IPointReader {
private ChunkReader chunkReader;
private BatchData data;
- /**
- * Each EngineChunkReader has a corresponding UnClosedTsFileReader, when EngineChunkReader is
- * closed, UnClosedTsFileReader also should be closed in meanwhile.
- */
- private TsFileSequenceReader unClosedTsFileReader;
-
- public EngineChunkReader(ChunkReader chunkReader, TsFileSequenceReader unClosedTsFileReader) {
+ public EngineChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
- this.unClosedTsFileReader = unClosedTsFileReader;
}
@Override
@@ -72,6 +65,5 @@ public class EngineChunkReader implements IPointReader {
@Override
public void close() throws IOException {
this.chunkReader.close();
- this.unClosedTsFileReader.close();
}
}