You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/06/21 03:53:47 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
bwf with value filter range query
This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 71b2f2d add bwf with value filter range query
new d222853 Merge branch 'feature_async_close_tsfile' of https://github.com/apache/incubator-iotdb into feature_async_close_tsfile
71b2f2d is described below
commit 71b2f2df86a01840da6e49089df4ab93da23d4fe
Author: suyue <23...@qq.com>
AuthorDate: Fri Jun 21 11:52:28 2019 +0800
add bwf with value filter range query
---
.../executor/EngineExecutorWithTimeGenerator.java | 2 +-
.../db/query/factory/SeriesReaderFactory.java | 38 ++++++
.../FileSeriesByTimestampIAggregateReader.java | 43 ++++++
.../sequence/SequenceDataReaderByTimestampV2.java | 147 +++++++++++++++++++++
.../UnSealedTsFilesReaderByTimestampV2.java | 78 +++++++++++
5 files changed, 307 insertions(+), 1 deletion(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index 3b40187..9001ee0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -66,7 +66,7 @@ public class EngineExecutorWithTimeGenerator {
try {
timestampGenerator = new EngineTimeGenerator(queryExpression.getExpression(), context);
readersOfSelectedSeries = SeriesReaderFactory
- .getByTimestampReadersOfSelectedPaths(queryExpression.getSelectedSeries(), context);
+ .getByTimestampReadersOfSelectedPathsV2(queryExpression.getSelectedSeries(), context);
} catch (IOException ex) {
throw new FileNodeManagerException(ex);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index b731fc5..982aafc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -42,6 +43,7 @@ import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader;
import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestampV2;
import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReaderByTimestamp;
import org.apache.iotdb.db.utils.QueryUtils;
@@ -223,6 +225,42 @@ public class SeriesReaderFactory {
}
/**
+ * construct ByTimestampReader, include sequential data and unsequential data.
+ *
+ * @param paths selected series path
+ * @param context query context
+ * @return the list of EngineReaderByTimeStamp
+ */
+ public static List<EngineReaderByTimeStamp> getByTimestampReadersOfSelectedPathsV2(
+ List<Path> paths, QueryContext context) throws IOException, FileNodeManagerException {
+
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
+
+ for (Path path : paths) {
+
+ QueryDataSourceV2 queryDataSource = QueryResourceManager.getInstance().getQueryDataSourceV2(path,
+ context);
+
+ PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp();
+
+ // reader for sequence data
+ SequenceDataReaderByTimestampV2 tsFilesReader = new SequenceDataReaderByTimestampV2(path,
+ queryDataSource.getSeqDataSource().getQueryTsFiles(), context);
+ mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
+
+ // reader for unSequence data
+ //TODO
+// PriorityMergeReaderByTimestamp unSeqMergeReader = SeriesReaderFactory.getInstance()
+// .createUnSeqMergeReaderByTimestamp(queryDataSource.getOverflowSeriesDataSource());
+// mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2);
+
+ readersOfSelectedSeries.add(mergeReaderByTimestamp);
+ }
+
+ return readersOfSelectedSeries;
+ }
+
+ /**
* This method is used to create unsequence insert reader by timestamp for IoTDB request, such as
* query, aggregation and groupby request.
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/FileSeriesByTimestampIAggregateReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/FileSeriesByTimestampIAggregateReader.java
new file mode 100644
index 0000000..810d649
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/FileSeriesByTimestampIAggregateReader.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.sequence;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp;
+
+public class FileSeriesByTimestampIAggregateReader implements EngineReaderByTimeStamp {
+
+ private SeriesReaderByTimestamp seriesReaderByTimestamp;
+
+ public FileSeriesByTimestampIAggregateReader(SeriesReaderByTimestamp seriesReaderByTimestamp) {
+ this.seriesReaderByTimestamp = seriesReaderByTimestamp;
+ }
+
+ @Override
+ public Object getValueInTimestamp(long timestamp) throws IOException {
+ return seriesReaderByTimestamp.getValueInTimestamp(timestamp);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return seriesReaderByTimestamp.hasNext();
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampV2.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampV2.java
new file mode 100644
index 0000000..d3abac2
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampV2.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.sequence;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp;
+
+public class SequenceDataReaderByTimestampV2 implements EngineReaderByTimeStamp {
+
+ protected Path seriesPath;
+ private List<TsFileResourceV2> tsFileResourceV2List;
+ private int nextIntervalFileIndex;
+ protected EngineReaderByTimeStamp seriesReader;
+ private QueryContext context;
+
+ /**
+ * init with seriesPath and sealedTsFiles.
+ */
+ public SequenceDataReaderByTimestampV2(Path seriesPath,
+ List<TsFileResourceV2> tsFileResourceV2List,
+ QueryContext context) {
+ this.seriesPath = seriesPath;
+ this.tsFileResourceV2List = tsFileResourceV2List;
+ this.nextIntervalFileIndex = 0;
+ this.seriesReader = null;
+ this.context = context;
+ }
+
+ @Override
+ public Object getValueInTimestamp(long timestamp) throws IOException {
+ Object value = null;
+ if (seriesReader != null) {
+ value = seriesReader.getValueInTimestamp(timestamp);
+ if (value != null || seriesReader.hasNext()) {
+ return value;
+ }
+ }
+ constructReader(timestamp);
+ if (seriesReader != null) {
+ value = seriesReader.getValueInTimestamp(timestamp);
+ if (value != null || seriesReader.hasNext()) {
+ return value;
+ }
+ }
+
+ return value;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (seriesReader != null && seriesReader.hasNext()) {
+ return true;
+ }
+ while (nextIntervalFileIndex < tsFileResourceV2List.size()) {
+ initSealedTsFileReader(tsFileResourceV2List.get(nextIntervalFileIndex), context);
+ nextIntervalFileIndex++;
+ if (seriesReader.hasNext()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * construct reader with the file that might overlap this timestamp.
+ */
+ private void constructReader(long timestamp) throws IOException {
+ while (nextIntervalFileIndex < tsFileResourceV2List.size()) {
+ TsFileResourceV2 tsFile = tsFileResourceV2List.get(nextIntervalFileIndex);
+ nextIntervalFileIndex++;
+ if (!tsFile.isClosed()) {
+ initUnSealedTsFileReader(tsFile, context);
+ break;
+ }
+ if (singleTsFileSatisfied(tsFile, timestamp)) {
+ initSealedTsFileReader(tsFile, context);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Judge whether the file should be skipped.
+ */
+ private boolean singleTsFileSatisfied(TsFileResourceV2 fileNode, long timestamp) {
+ if (fileNode.isClosed()) {
+ return fileNode.getEndTimeMap().get(seriesPath.getDevice()) >= timestamp;
+ }
+ return true;
+ }
+
+ private void initUnSealedTsFileReader(TsFileResourceV2 tsFile, QueryContext context)
+ throws IOException {
+ seriesReader = new UnSealedTsFilesReaderByTimestampV2(tsFile);
+ }
+
+ private void initSealedTsFileReader(TsFileResourceV2 fileNode, QueryContext context)
+ throws IOException {
+
+ // to avoid too many opened files
+ TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
+ .get(fileNode.getFile().getPath(), true);
+
+ MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
+ List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
+
+ List<Modification> pathModifications = context.getPathModifications(fileNode.getModFile(),
+ seriesPath.getFullPath());
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
+ }
+ ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
+
+ seriesReader = new FileSeriesByTimestampIAggregateReader(
+ new SeriesReaderByTimestamp(chunkLoader, metaDataList));
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestampV2.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestampV2.java
new file mode 100644
index 0000000..3a93346
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestampV2.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.sequence;
+
+import java.io.IOException;
+import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp;
+
+public class UnSealedTsFilesReaderByTimestampV2 implements EngineReaderByTimeStamp {
+
+ protected Path seriesPath;
+ private SeriesReaderByTimestamp unSealedReader;
+ private EngineReaderByTimeStamp memSeriesReader;
+ private boolean unSealedReaderEnded;
+
+ /**
+ * Construct funtion for UnSealedTsFileReader.
+ *
+ * @param tsFileResource -unclosed tsfile resource
+ */
+ public UnSealedTsFilesReaderByTimestampV2(TsFileResourceV2 tsFileResource) throws IOException {
+ TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
+ .get(tsFileResource.getFile().getPath(), false);
+ ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
+ unSealedReader = new SeriesReaderByTimestamp(chunkLoader,
+ tsFileResource.getChunkMetaDatas());
+
+ memSeriesReader = new MemChunkReaderByTimestamp(tsFileResource.getReadOnlyMemChunk());
+ unSealedReaderEnded = false;
+ }
+
+ @Override
+ public Object getValueInTimestamp(long timestamp) throws IOException {
+ Object value = null;
+ if (!unSealedReaderEnded) {
+ value = unSealedReader.getValueInTimestamp(timestamp);
+ }
+ if (value != null || unSealedReader.hasNext()) {
+ return value;
+ } else {
+ unSealedReaderEnded = true;
+ }
+ return memSeriesReader.getValueInTimestamp(timestamp);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (unSealedReaderEnded) {
+ return memSeriesReader.hasNext();
+ }
+ return (unSealedReader.hasNext() || memSeriesReader.hasNext());
+ }
+
+}