You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/11 02:44:28 UTC

[iotdb] branch new_vector updated: try to adapt Last query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_vector by this push:
     new c2b67a7  try to adapt Last query
c2b67a7 is described below

commit c2b67a7b2f8eef0105f0f079d80988aca12298b5
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Nov 11 10:43:53 2021 +0800

    try to adapt Last query
---
 .../apache/iotdb/db/metadata/path/AlignedPath.java | 13 +++++
 .../iotdb/db/metadata/path/MeasurementPath.java    | 13 +++++
 .../apache/iotdb/db/metadata/path/PartialPath.java | 11 ++++
 .../iotdb/db/query/executor/LastQueryExecutor.java | 36 +++++++-------
 .../executor/fill/AlignedLastPointReader.java      | 58 ++++++++++++++++++++++
 .../db/query/executor/fill/LastPointReader.java    | 44 ++++++++++------
 .../tsfile/read/reader/page/AlignedPageReader.java |  5 +-
 7 files changed, 145 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 57262cb..0cfd926 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.fill.AlignedLastPointReader;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -210,6 +211,18 @@ public class AlignedPath extends PartialPath {
   }
 
   @Override
+  public AlignedLastPointReader createLastPointReader(
+      TSDataType dataType,
+      Set<String> deviceMeasurements,
+      QueryContext context,
+      QueryDataSource dataSource,
+      long queryTime,
+      Filter timeFilter) {
+    return new AlignedLastPointReader(
+        this, dataType, deviceMeasurements, context, dataSource, queryTime, timeFilter);
+  }
+
+  @Override
   public AlignedSeriesReader createSeriesReader(
       Set<String> allSensors,
       TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 840b8c2..b9edd98 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.SeriesReader;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -124,6 +125,18 @@ public class MeasurementPath extends PartialPath {
     return isUnderAlignedEntity ? new AlignedPath(this) : this;
   }
 
+  @Override
+  public LastPointReader createLastPointReader(
+      TSDataType dataType,
+      Set<String> deviceMeasurements,
+      QueryContext context,
+      QueryDataSource dataSource,
+      long queryTime,
+      Filter timeFilter) {
+    return new LastPointReader(
+        this, dataType, deviceMeasurements, context, dataSource, queryTime, timeFilter);
+  }
+
   public SeriesReader createSeriesReader(
       Set<String> allSensors,
       TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 47d348e..eb6e0fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.SeriesReader;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -371,6 +372,16 @@ public class PartialPath extends Path implements Comparable<Path> {
     return ret;
   }
 
+  public LastPointReader createLastPointReader(
+      TSDataType dataType,
+      Set<String> deviceMeasurements,
+      QueryContext context,
+      QueryDataSource dataSource,
+      long queryTime,
+      Filter timeFilter) {
+    throw new UnsupportedOperationException("Should call exact sub class!");
+  }
+
   public SeriesReader createSeriesReader(
       Set<String> allSensors,
       TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 7931c54..5da5cc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -175,15 +175,16 @@ public class LastQueryExecutor {
             QueryResourceManager.getInstance()
                 .getQueryDataSource(nonCachedPaths.get(i), context, null);
         LastPointReader lastReader =
-            new LastPointReader(
-                nonCachedPaths.get(i),
-                nonCachedDataTypes.get(i),
-                deviceMeasurementsMap.getOrDefault(
-                    nonCachedPaths.get(i).getDevice(), new HashSet<>()),
-                context,
-                dataSource,
-                Long.MAX_VALUE,
-                null);
+            nonCachedPaths
+                .get(i)
+                .createLastPointReader(
+                    nonCachedDataTypes.get(i),
+                    deviceMeasurementsMap.getOrDefault(
+                        nonCachedPaths.get(i).getDevice(), new HashSet<>()),
+                    context,
+                    dataSource,
+                    Long.MAX_VALUE,
+                    null);
         readerList.add(lastReader);
       }
     } finally {
@@ -230,13 +231,17 @@ public class LastQueryExecutor {
       restDataType.addAll(dataTypes);
       for (int i = 0; i < seriesPaths.size(); i++) {
         resultContainer.add(new Pair<>(false, null));
+        PartialPath p = ((MeasurementPath) seriesPaths.get(i)).transformToExactPath();
+        restPaths.add(p);
+        restDataType.add(dataTypes.get(i));
       }
     }
     for (int i = 0; i < cacheAccessors.size(); i++) {
       TimeValuePair tvPair = cacheAccessors.get(i).read();
       if (tvPair == null) {
         resultContainer.add(new Pair<>(false, null));
-        restPaths.add(seriesPaths.get(i));
+        PartialPath p = ((MeasurementPath) seriesPaths.get(i)).transformToExactPath();
+        restPaths.add(p);
         restDataType.add(dataTypes.get(i));
       } else if (!satisfyFilter(filter, tvPair)) {
         resultContainer.add(new Pair<>(true, null));
@@ -260,15 +265,12 @@ public class LastQueryExecutor {
   }
 
   private static class LastCacheAccessor {
-    private PartialPath path;
+
+    private final MeasurementPath path;
     private IMeasurementMNode node;
 
     LastCacheAccessor(PartialPath seriesPath) {
-      if (seriesPath instanceof AlignedPath) {
-        this.path = seriesPath.concatNode(((AlignedPath) seriesPath).getMeasurement(0));
-      } else {
-        this.path = seriesPath;
-      }
+      this.path = (MeasurementPath) seriesPath;
     }
 
     public TimeValuePair read() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/AlignedLastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/AlignedLastPointReader.java
new file mode 100644
index 0000000..c8be28a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/AlignedLastPointReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor.fill;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class AlignedLastPointReader extends LastPointReader {
+
+  public AlignedLastPointReader(
+      PartialPath seriesPath,
+      TSDataType dataType,
+      Set<String> deviceMeasurements,
+      QueryContext context,
+      QueryDataSource dataSource,
+      long queryTime,
+      Filter timeFilter) {
+    super(seriesPath, dataType, deviceMeasurements, context, dataSource, queryTime, timeFilter);
+  }
+
+  @Override
+  protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource,
+      PartialPath seriesPath,
+      QueryContext context,
+      Filter filter,
+      Set<String> allSensors)
+      throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(
+        resource, (AlignedPath) seriesPath, context, filter);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
index d839b2d..2b5b61c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
@@ -43,23 +43,23 @@ import java.util.Set;
 
 public class LastPointReader {
 
-  private PartialPath seriesPath;
+  // if it is a common path, it will be MeasurementPath
+  // if it is a sub sensor of an aligned device, it will be AlignedPath with only one sub sensor
+  private final PartialPath seriesPath;
   long queryTime;
   TSDataType dataType;
-  private QueryContext context;
+  private final QueryContext context;
 
   // measurements of the same device as "seriesPath"
-  private Set<String> deviceMeasurements;
+  private final Set<String> deviceMeasurements;
 
-  private Filter timeFilter;
+  private final Filter timeFilter;
 
-  private QueryDataSource dataSource;
+  private final QueryDataSource dataSource;
 
   private IChunkMetadata cachedLastChunk;
 
-  private List<ITimeSeriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
-
-  public LastPointReader() {}
+  private final List<ITimeSeriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
 
   public LastPointReader(
       PartialPath seriesPath,
@@ -107,8 +107,7 @@ public class LastPointReader {
       TsFileResource resource = seqFileResource.get(index);
       ITimeSeriesMetadata timeseriesMetadata;
       timeseriesMetadata =
-          FileLoaderUtils.loadTimeSeriesMetadata(
-              resource, seriesPath, context, timeFilter, deviceMeasurements);
+          loadTimeSeriesMetadata(resource, seriesPath, context, timeFilter, deviceMeasurements);
       if (timeseriesMetadata != null) {
         if (!timeseriesMetadata.isModified()
             && endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
@@ -132,7 +131,18 @@ public class LastPointReader {
     return lastPoint;
   }
 
-  /** find the last TimeseriesMetadata in unseq files and unpack all overlapped unseq files */
+  protected ITimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource,
+      PartialPath seriesPath,
+      QueryContext context,
+      Filter filter,
+      Set<String> allSensors)
+      throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(
+        resource, seriesPath, context, filter, allSensors);
+  }
+
+  /** find the last TimeeriesMetadata in unseq files and unpack all overlapped unseq files */
   private void UnpackOverlappedUnseqFiles(long lBoundTime) throws IOException {
     PriorityQueue<TsFileResource> unseqFileResource =
         sortUnSeqFileResourcesInDecendingOrder(dataSource.getUnseqResources());
@@ -140,7 +150,7 @@ public class LastPointReader {
     while (!unseqFileResource.isEmpty()
         && (lBoundTime <= unseqFileResource.peek().getEndTime(seriesPath.getDevice()))) {
       ITimeSeriesMetadata timeseriesMetadata =
-          FileLoaderUtils.loadTimeSeriesMetadata(
+          loadTimeSeriesMetadata(
               unseqFileResource.poll(), seriesPath, context, timeFilter, deviceMeasurements);
 
       if (timeseriesMetadata == null
@@ -161,7 +171,7 @@ public class LastPointReader {
 
   private TimeValuePair getChunkLastPoint(IChunkMetadata chunkMetaData) throws IOException {
     TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
-    if (chunkMetaData == null) {
+    if (chunkMetaData == null || chunkMetaData.getStatistics() == null) {
       return lastPoint;
     }
     Statistics chunkStatistics = chunkMetaData.getStatistics();
@@ -181,7 +191,9 @@ public class LastPointReader {
     while (it.hasNext()) {
       IPageReader pageReader = (IPageReader) it.next();
       Statistics pageStatistics = pageReader.getStatistics();
-      if (!pageReader.isModified() && endtimeContainedByTimeFilter(pageStatistics)) {
+      if (pageStatistics != null
+          && !pageReader.isModified()
+          && endtimeContainedByTimeFilter(pageStatistics)) {
         lastPoint =
             constructLastPair(pageStatistics.getEndTime(), pageStatistics.getLastValue(), dataType);
       } else {
@@ -206,8 +218,8 @@ public class LastPointReader {
     PriorityQueue<TsFileResource> unseqTsFilesSet =
         new PriorityQueue<>(
             (o1, o2) -> {
-              Long maxTimeOfO1 = o1.getEndTime(seriesPath.getDevice());
-              Long maxTimeOfO2 = o2.getEndTime(seriesPath.getDevice());
+              long maxTimeOfO1 = o1.getEndTime(seriesPath.getDevice());
+              long maxTimeOfO2 = o2.getEndTime(seriesPath.getDevice());
               return Long.compare(maxTimeOfO2, maxTimeOfO1);
             });
     unseqTsFilesSet.addAll(tsFileResources);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index fcac3ce..708e494 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -112,13 +112,14 @@ public class AlignedPageReader implements IPageReader {
 
   @Override
   public Statistics getStatistics() {
-    return valuePageReaderList.size() == 1
+    return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null
         ? valuePageReaderList.get(0).getStatistics()
         : timePageReader.getStatistics();
   }
 
   public Statistics getStatistics(int index) {
-    return valuePageReaderList.get(index).getStatistics();
+    ValuePageReader valuePageReader = valuePageReaderList.get(index);
+    return valuePageReader == null ? null : valuePageReader.getStatistics();
   }
 
   @Override