You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/11 02:44:28 UTC
[iotdb] branch new_vector updated: try to adapt Last query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new c2b67a7 try to adapt Last query
c2b67a7 is described below
commit c2b67a7b2f8eef0105f0f079d80988aca12298b5
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Nov 11 10:43:53 2021 +0800
try to adapt Last query
---
.../apache/iotdb/db/metadata/path/AlignedPath.java | 13 +++++
.../iotdb/db/metadata/path/MeasurementPath.java | 13 +++++
.../apache/iotdb/db/metadata/path/PartialPath.java | 11 ++++
.../iotdb/db/query/executor/LastQueryExecutor.java | 36 +++++++-------
.../executor/fill/AlignedLastPointReader.java | 58 ++++++++++++++++++++++
.../db/query/executor/fill/LastPointReader.java | 44 ++++++++++------
.../tsfile/read/reader/page/AlignedPageReader.java | 5 +-
7 files changed, 145 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 57262cb..0cfd926 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.fill.AlignedLastPointReader;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
@@ -210,6 +211,18 @@ public class AlignedPath extends PartialPath {
}
@Override
+ public AlignedLastPointReader createLastPointReader(
+ TSDataType dataType,
+ Set<String> deviceMeasurements,
+ QueryContext context,
+ QueryDataSource dataSource,
+ long queryTime,
+ Filter timeFilter) {
+ return new AlignedLastPointReader(
+ this, dataType, deviceMeasurements, context, dataSource, queryTime, timeFilter);
+ }
+
+ @Override
public AlignedSeriesReader createSeriesReader(
Set<String> allSensors,
TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 840b8c2..b9edd98 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
@@ -124,6 +125,18 @@ public class MeasurementPath extends PartialPath {
return isUnderAlignedEntity ? new AlignedPath(this) : this;
}
+ @Override
+ public LastPointReader createLastPointReader(
+ TSDataType dataType,
+ Set<String> deviceMeasurements,
+ QueryContext context,
+ QueryDataSource dataSource,
+ long queryTime,
+ Filter timeFilter) {
+ return new LastPointReader(
+ this, dataType, deviceMeasurements, context, dataSource, queryTime, timeFilter);
+ }
+
public SeriesReader createSeriesReader(
Set<String> allSensors,
TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 47d348e..eb6e0fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
@@ -371,6 +372,16 @@ public class PartialPath extends Path implements Comparable<Path> {
return ret;
}
+ public LastPointReader createLastPointReader(
+ TSDataType dataType,
+ Set<String> deviceMeasurements,
+ QueryContext context,
+ QueryDataSource dataSource,
+ long queryTime,
+ Filter timeFilter) {
+ throw new UnsupportedOperationException("Should call exact sub class!");
+ }
+
public SeriesReader createSeriesReader(
Set<String> allSensors,
TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 7931c54..5da5cc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -175,15 +175,16 @@ public class LastQueryExecutor {
QueryResourceManager.getInstance()
.getQueryDataSource(nonCachedPaths.get(i), context, null);
LastPointReader lastReader =
- new LastPointReader(
- nonCachedPaths.get(i),
- nonCachedDataTypes.get(i),
- deviceMeasurementsMap.getOrDefault(
- nonCachedPaths.get(i).getDevice(), new HashSet<>()),
- context,
- dataSource,
- Long.MAX_VALUE,
- null);
+ nonCachedPaths
+ .get(i)
+ .createLastPointReader(
+ nonCachedDataTypes.get(i),
+ deviceMeasurementsMap.getOrDefault(
+ nonCachedPaths.get(i).getDevice(), new HashSet<>()),
+ context,
+ dataSource,
+ Long.MAX_VALUE,
+ null);
readerList.add(lastReader);
}
} finally {
@@ -230,13 +231,17 @@ public class LastQueryExecutor {
restDataType.addAll(dataTypes);
for (int i = 0; i < seriesPaths.size(); i++) {
resultContainer.add(new Pair<>(false, null));
+ PartialPath p = ((MeasurementPath) seriesPaths.get(i)).transformToExactPath();
+ restPaths.add(p);
+ restDataType.add(dataTypes.get(i));
}
}
for (int i = 0; i < cacheAccessors.size(); i++) {
TimeValuePair tvPair = cacheAccessors.get(i).read();
if (tvPair == null) {
resultContainer.add(new Pair<>(false, null));
- restPaths.add(seriesPaths.get(i));
+ PartialPath p = ((MeasurementPath) seriesPaths.get(i)).transformToExactPath();
+ restPaths.add(p);
restDataType.add(dataTypes.get(i));
} else if (!satisfyFilter(filter, tvPair)) {
resultContainer.add(new Pair<>(true, null));
@@ -260,15 +265,12 @@ public class LastQueryExecutor {
}
private static class LastCacheAccessor {
- private PartialPath path;
+
+ private final MeasurementPath path;
private IMeasurementMNode node;
LastCacheAccessor(PartialPath seriesPath) {
- if (seriesPath instanceof AlignedPath) {
- this.path = seriesPath.concatNode(((AlignedPath) seriesPath).getMeasurement(0));
- } else {
- this.path = seriesPath;
- }
+ this.path = (MeasurementPath) seriesPath;
}
public TimeValuePair read() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/AlignedLastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/AlignedLastPointReader.java
new file mode 100644
index 0000000..c8be28a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/AlignedLastPointReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor.fill;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class AlignedLastPointReader extends LastPointReader {
+
+ public AlignedLastPointReader(
+ PartialPath seriesPath,
+ TSDataType dataType,
+ Set<String> deviceMeasurements,
+ QueryContext context,
+ QueryDataSource dataSource,
+ long queryTime,
+ Filter timeFilter) {
+ super(seriesPath, dataType, deviceMeasurements, context, dataSource, queryTime, timeFilter);
+ }
+
+ @Override
+ protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+ TsFileResource resource,
+ PartialPath seriesPath,
+ QueryContext context,
+ Filter filter,
+ Set<String> allSensors)
+ throws IOException {
+ return FileLoaderUtils.loadTimeSeriesMetadata(
+ resource, (AlignedPath) seriesPath, context, filter);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
index d839b2d..2b5b61c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
@@ -43,23 +43,23 @@ import java.util.Set;
public class LastPointReader {
- private PartialPath seriesPath;
+ // if it is a common path, it will be MeasurementPath
+ // if it is a sub sensor of an aligned device, it will be AlignedPath with only one sub sensor
+ private final PartialPath seriesPath;
long queryTime;
TSDataType dataType;
- private QueryContext context;
+ private final QueryContext context;
// measurements of the same device as "seriesPath"
- private Set<String> deviceMeasurements;
+ private final Set<String> deviceMeasurements;
- private Filter timeFilter;
+ private final Filter timeFilter;
- private QueryDataSource dataSource;
+ private final QueryDataSource dataSource;
private IChunkMetadata cachedLastChunk;
- private List<ITimeSeriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
-
- public LastPointReader() {}
+ private final List<ITimeSeriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
public LastPointReader(
PartialPath seriesPath,
@@ -107,8 +107,7 @@ public class LastPointReader {
TsFileResource resource = seqFileResource.get(index);
ITimeSeriesMetadata timeseriesMetadata;
timeseriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource, seriesPath, context, timeFilter, deviceMeasurements);
+ loadTimeSeriesMetadata(resource, seriesPath, context, timeFilter, deviceMeasurements);
if (timeseriesMetadata != null) {
if (!timeseriesMetadata.isModified()
&& endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
@@ -132,7 +131,18 @@ public class LastPointReader {
return lastPoint;
}
- /** find the last TimeseriesMetadata in unseq files and unpack all overlapped unseq files */
+ protected ITimeSeriesMetadata loadTimeSeriesMetadata(
+ TsFileResource resource,
+ PartialPath seriesPath,
+ QueryContext context,
+ Filter filter,
+ Set<String> allSensors)
+ throws IOException {
+ return FileLoaderUtils.loadTimeSeriesMetadata(
+ resource, seriesPath, context, filter, allSensors);
+ }
+
+ /** find the last TimeeriesMetadata in unseq files and unpack all overlapped unseq files */
private void UnpackOverlappedUnseqFiles(long lBoundTime) throws IOException {
PriorityQueue<TsFileResource> unseqFileResource =
sortUnSeqFileResourcesInDecendingOrder(dataSource.getUnseqResources());
@@ -140,7 +150,7 @@ public class LastPointReader {
while (!unseqFileResource.isEmpty()
&& (lBoundTime <= unseqFileResource.peek().getEndTime(seriesPath.getDevice()))) {
ITimeSeriesMetadata timeseriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
+ loadTimeSeriesMetadata(
unseqFileResource.poll(), seriesPath, context, timeFilter, deviceMeasurements);
if (timeseriesMetadata == null
@@ -161,7 +171,7 @@ public class LastPointReader {
private TimeValuePair getChunkLastPoint(IChunkMetadata chunkMetaData) throws IOException {
TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
- if (chunkMetaData == null) {
+ if (chunkMetaData == null || chunkMetaData.getStatistics() == null) {
return lastPoint;
}
Statistics chunkStatistics = chunkMetaData.getStatistics();
@@ -181,7 +191,9 @@ public class LastPointReader {
while (it.hasNext()) {
IPageReader pageReader = (IPageReader) it.next();
Statistics pageStatistics = pageReader.getStatistics();
- if (!pageReader.isModified() && endtimeContainedByTimeFilter(pageStatistics)) {
+ if (pageStatistics != null
+ && !pageReader.isModified()
+ && endtimeContainedByTimeFilter(pageStatistics)) {
lastPoint =
constructLastPair(pageStatistics.getEndTime(), pageStatistics.getLastValue(), dataType);
} else {
@@ -206,8 +218,8 @@ public class LastPointReader {
PriorityQueue<TsFileResource> unseqTsFilesSet =
new PriorityQueue<>(
(o1, o2) -> {
- Long maxTimeOfO1 = o1.getEndTime(seriesPath.getDevice());
- Long maxTimeOfO2 = o2.getEndTime(seriesPath.getDevice());
+ long maxTimeOfO1 = o1.getEndTime(seriesPath.getDevice());
+ long maxTimeOfO2 = o2.getEndTime(seriesPath.getDevice());
return Long.compare(maxTimeOfO2, maxTimeOfO1);
});
unseqTsFilesSet.addAll(tsFileResources);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index fcac3ce..708e494 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -112,13 +112,14 @@ public class AlignedPageReader implements IPageReader {
@Override
public Statistics getStatistics() {
- return valuePageReaderList.size() == 1
+ return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null
? valuePageReaderList.get(0).getStatistics()
: timePageReader.getStatistics();
}
public Statistics getStatistics(int index) {
- return valuePageReaderList.get(index).getStatistics();
+ ValuePageReader valuePageReader = valuePageReaderList.get(index);
+ return valuePageReader == null ? null : valuePageReader.getStatistics();
}
@Override