You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/06/24 15:31:57 UTC
[incubator-iotdb] 01/03: fix bug of jira-121
This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch qovc
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 055092562495f8c120ba6c2d74313f4dc4178b1c
Author: suyue <23...@qq.com>
AuthorDate: Mon Jun 24 22:50:39 2019 +0800
fix bug of jira-121
---
.../EngineExecutorWithoutTimeGenerator.java | 3 +-
.../db/query/factory/ISeriesReaderFactory.java | 27 ++++--
.../db/query/factory/SeriesReaderFactoryImpl.java | 99 +++++++++++++++-------
.../java/org/apache/iotdb/db/query/fill/IFill.java | 2 +-
.../query/reader/AllDataReaderWithValueFilter.java | 74 ++++++++++++++++
.../query/timegenerator/EngineNodeConstructor.java | 3 +-
6 files changed, 170 insertions(+), 38 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 34d93cc..14b0b2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -97,6 +97,7 @@ public class EngineExecutorWithoutTimeGenerator {
throw new FileNodeManagerException(e);
}
- return SeriesReaderFactoryImpl.getInstance().createAllDataReader(path, timeFilter, context);
+ return SeriesReaderFactoryImpl.getInstance()
+ .createTimeFilterAllDataReader(path, timeFilter, context);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
index 416bae6..0bbefb1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/ISeriesReaderFactory.java
@@ -40,8 +40,9 @@ public interface ISeriesReaderFactory {
* This method is used to read all unsequence data for IoTDB request, such as query, aggregation
* and groupby request.
*/
- IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources, QueryContext context,
- Filter filter) throws IOException;
+ IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
+ QueryContext context,
+ Filter filter) throws IOException;
/**
@@ -55,12 +56,28 @@ public interface ISeriesReaderFactory {
QueryContext context) throws FileNodeManagerException, IOException;
/**
- * construct IPointReader, include sequential data and unsequential data.
+ * construct IPointReader with <br>only time filter or no filter</br>, include sequential data and
+ * unsequential data. This reader won't filter the result of merged sequential data and
+ * unsequential data reader.
*
* @param path selected series path
+ * @param timeFilter time filter or null
* @param context query context
- * @return the list of EngineReaderByTimeStamp
+ * @return data reader including seq and unseq data source.
*/
- IPointReader createAllDataReader(Path path, Filter timeFilter,
+ IPointReader createTimeFilterAllDataReader(Path path, Filter timeFilter,
QueryContext context) throws FileNodeManagerException, IOException;
+
+ /**
+ * construct IPointReader with <br>value filter</br>, include sequential data and unsequential
+ * data. This reader will filter the result of merged sequential data and unsequential data
+ * reader, so if only has time filter please call createTimeFilterAllDataReader().
+ *
+ * @param path selected series path
+ * @param filter time filter or null
+ * @param context query context
+ * @return data reader including seq and unseq data source.
+ */
+ IPointReader createValueFilterAllDataReader(Path path, Filter filter, QueryContext context)
+ throws FileNodeManagerException, IOException;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
index a5a1651..2b45073 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactoryImpl.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.query.factory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
@@ -27,6 +30,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.AllDataReader;
+import org.apache.iotdb.db.query.reader.AllDataReaderWithValueFilter;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
@@ -54,10 +58,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
private static final Logger logger = LoggerFactory.getLogger(SeriesReaderFactory.class);
@@ -70,8 +70,9 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
}
@Override
- public IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources, QueryContext context,
- Filter filter) throws IOException {
+ public IPointReader createUnSeqReader(Path seriesPath, List<TsFileResourceV2> unSeqResources,
+ QueryContext context,
+ Filter filter) throws IOException {
PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
int priorityValue = 1;
@@ -80,7 +81,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
// store only one opened file stream into manager, to avoid too many opened files
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
- .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
+ .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
// get modified chunk metadatas
List<ChunkMetaData> metaDataList;
@@ -88,7 +89,8 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
// mod
- List<Modification> pathModifications = context.getPathModifications(tsFileResourceV2.getModFile(),
+ List<Modification> pathModifications = context
+ .getPathModifications(tsFileResourceV2.getModFile(),
seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
@@ -103,10 +105,10 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
for (ChunkMetaData chunkMetaData : metaDataList) {
DigestForFilter digest = new DigestForFilter(chunkMetaData.getStartTime(),
- chunkMetaData.getEndTime(),
- chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
- chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
- chunkMetaData.getTsDataType());
+ chunkMetaData.getEndTime(),
+ chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
+ chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
+ chunkMetaData.getTsDataType());
if (filter != null && !filter.satisfy(digest)) {
continue;
@@ -114,7 +116,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
- : new ChunkReaderWithoutFilter(chunk);
+ : new ChunkReaderWithoutFilter(chunk);
unSeqMergeReader.addReaderWithPriority(new EngineChunkReader(chunkReader), priorityValue);
@@ -123,7 +125,8 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
// add reader for MemTable
if (!tsFileResourceV2.isClosed()) {
- unSeqMergeReader.addReaderWithPriority(new MemChunkReader(tsFileResourceV2.getReadOnlyMemChunk(), filter), priorityValue++);
+ unSeqMergeReader.addReaderWithPriority(
+ new MemChunkReader(tsFileResourceV2.getReadOnlyMemChunk(), filter), priorityValue++);
}
}
@@ -132,7 +135,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
}
private PriorityMergeReaderByTimestamp createUnSeqByTimestampReader(Path seriesPath,
- List<TsFileResourceV2> unSeqResources, QueryContext context) throws IOException {
+ List<TsFileResourceV2> unSeqResources, QueryContext context) throws IOException {
PriorityMergeReaderByTimestamp unSeqMergeReader = new PriorityMergeReaderByTimestamp();
int priorityValue = 1;
@@ -141,14 +144,15 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
// store only one opened file stream into manager, to avoid too many opened files
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
- .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
+ .get(tsFileResourceV2.getFile().getPath(), tsFileResourceV2.isClosed());
List<ChunkMetaData> metaDataList;
if (tsFileResourceV2.isClosed()) {
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
// mod
- List<Modification> pathModifications = context.getPathModifications(tsFileResourceV2.getModFile(),
+ List<Modification> pathModifications = context
+ .getPathModifications(tsFileResourceV2.getModFile(),
seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
@@ -165,14 +169,15 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk);
unSeqMergeReader.addReaderWithPriority(new EngineChunkReaderByTimestamp(chunkReader),
- priorityValue);
+ priorityValue);
priorityValue++;
}
// add reader for MemTable
if (!tsFileResourceV2.isClosed()) {
- unSeqMergeReader.addReaderWithPriority(new MemChunkReaderByTimestamp(tsFileResourceV2.getReadOnlyMemChunk()), priorityValue++);
+ unSeqMergeReader.addReaderWithPriority(
+ new MemChunkReaderByTimestamp(tsFileResourceV2.getReadOnlyMemChunk()), priorityValue++);
}
}
@@ -182,7 +187,7 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
@Override
public List<EngineReaderByTimeStamp> createByTimestampReadersOfSelectedPaths(List<Path> paths,
- QueryContext context) throws FileNodeManagerException, IOException {
+ QueryContext context) throws FileNodeManagerException, IOException {
List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
for (Path path : paths) {
@@ -190,8 +195,8 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
QueryDataSourceV2 queryDataSource = null;
try {
queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSourceV2(path,
- context);
+ .getQueryDataSourceV2(path,
+ context);
} catch (ProcessorException e) {
throw new FileNodeManagerException(e);
}
@@ -200,13 +205,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
// reader for sequence data
SequenceDataReaderByTimestampV2 tsFilesReader = new SequenceDataReaderByTimestampV2(path,
- queryDataSource.getSeqResources(), context);
+ queryDataSource.getSeqResources(), context);
mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
// reader for unSequence data
//TODO add create unseq reader
PriorityMergeReaderByTimestamp unSeqMergeReader = createUnSeqByTimestampReader(path,
- queryDataSource.getUnseqResources(), context);
+ queryDataSource.getUnseqResources(), context);
mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2);
readersOfSelectedSeries.add(mergeReaderByTimestamp);
@@ -216,12 +221,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
}
@Override
- public IPointReader createAllDataReader(Path path, Filter timeFilter, QueryContext context)
- throws FileNodeManagerException, IOException {
+ public IPointReader createTimeFilterAllDataReader(Path path, Filter timeFilter,
+ QueryContext context)
+ throws FileNodeManagerException, IOException {
QueryDataSourceV2 queryDataSource = null;
try {
queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSourceV2(path, context);
+ .getQueryDataSourceV2(path, context);
} catch (ProcessorException e) {
throw new FileNodeManagerException(e);
}
@@ -230,12 +236,13 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
SequenceDataReaderV2 tsFilesReader;
tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(),
- queryDataSource.getSeqResources(),
- timeFilter, context);
+ queryDataSource.getSeqResources(),
+ timeFilter, context);
// unseq reader for all chunk groups in unSeqFile
IPointReader unSeqMergeReader = null;
- unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), context, timeFilter);
+ unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), context,
+ timeFilter);
if (!tsFilesReader.hasNext()) {
//only have unsequence data.
@@ -246,6 +253,38 @@ public class SeriesReaderFactoryImpl implements ISeriesReaderFactory {
}
}
+ @Override
+ public IPointReader createValueFilterAllDataReader(Path path, Filter filter, QueryContext context)
+ throws FileNodeManagerException, IOException {
+ QueryDataSourceV2 queryDataSource = null;
+ try {
+ queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSourceV2(path, context);
+ } catch (ProcessorException e) {
+ throw new FileNodeManagerException(e);
+ }
+
+ // sequence reader for one sealed tsfile
+ SequenceDataReaderV2 tsFilesReader;
+
+ tsFilesReader = new SequenceDataReaderV2(queryDataSource.getSeriesPath(),
+ queryDataSource.getSeqResources(),
+ filter, context);
+
+ // unseq reader for all chunk groups in unSeqFile. Filter for unSeqMergeReader is null, because
+ // we won't push down filter in unsequence data source.
+ IPointReader unSeqMergeReader;
+ unSeqMergeReader = createUnSeqReader(path, queryDataSource.getUnseqResources(), context, null);
+
+ if (!tsFilesReader.hasNext()) {
+ //only have unsequence data.
+ return unSeqMergeReader;
+ } else {
+ //merge sequence data with unsequence data.
+ return new AllDataReaderWithValueFilter(tsFilesReader, unSeqMergeReader, filter);
+ }
+ }
+
private static class SeriesReaderFactoryHelper {
private static final SeriesReaderFactoryImpl INSTANCE = new SeriesReaderFactoryImpl();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index c720500..e8244af 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -54,7 +54,7 @@ public abstract class IFill {
throws IOException, FileNodeManagerException {
Filter timeFilter = constructFilter(beforeRange);
allDataReader = SeriesReaderFactoryImpl.getInstance()
- .createAllDataReader(path, timeFilter, context);
+ .createTimeFilterAllDataReader(path, timeFilter, context);
}
public abstract IPointReader getFillResult() throws IOException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReaderWithValueFilter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReaderWithValueFilter.java
new file mode 100644
index 0000000..8e5e632
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReaderWithValueFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader;
+
+import java.io.IOException;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * A value filter reader for read data source, including sequence data and unsequence data.
+ */
+public class AllDataReaderWithValueFilter extends AllDataReader {
+
+ private Filter filter;
+ private boolean hasCachedValue;
+ private TimeValuePair timeValuePair;
+
+ /**
+ * merge sequence reader, unsequence reader.
+ */
+ public AllDataReaderWithValueFilter(IBatchReader batchReader, IPointReader pointReader,
+ Filter filter) {
+ super(batchReader, pointReader);
+ this.filter = filter;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (hasCachedValue) {
+ return true;
+ }
+ while (super.hasNext()) {
+ timeValuePair = super.next();
+ if (filter.satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ hasCachedValue = true;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public TimeValuePair next() throws IOException {
+ if (hasCachedValue || hasNext()) {
+ hasCachedValue = false;
+ return timeValuePair;
+ } else {
+ throw new IOException("data reader is out of bound.");
+ }
+ }
+
+
+ @Override
+ public TimeValuePair current() throws IOException {
+ return timeValuePair;
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index cc68ec2..4ca159b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -51,7 +51,8 @@ public class EngineNodeConstructor extends AbstractNodeConstructor {
Filter filter = ((SingleSeriesExpression) expression).getFilter();
Path path = ((SingleSeriesExpression) expression).getSeriesPath();
return new EngineLeafNode(
- SeriesReaderFactoryImpl.getInstance().createAllDataReader(path, filter, context));
+ SeriesReaderFactoryImpl.getInstance()
+ .createValueFilterAllDataReader(path, filter, context));
} catch (IOException e) {
throw new FileNodeManagerException(e);
}