You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/05 08:50:08 UTC

[iotdb] branch new_vector updated: support query non-exist sub sensor in one tsfile

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_vector by this push:
     new ba368f9  support query non-exist sub sensor in one tsfile
ba368f9 is described below

commit ba368f9b0c2cedc810b0f076d8fb5065f19004e8
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Fri Nov 5 16:49:34 2021 +0800

    support query non-exist sub sensor in one tsfile
---
 .../chunk/metadata/DiskChunkMetadataLoader.java    |  8 +--
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 18 ++++--
 .../tsfile/file/metadata/AlignedChunkMetadata.java |  3 +-
 .../file/metadata/AlignedTimeSeriesMetadata.java   | 17 +++---
 .../tsfile/file/metadata/ITimeSeriesMetadata.java  |  2 -
 .../read/reader/chunk/AlignedChunkReader.java      | 65 +++++++++++++++-------
 .../tsfile/read/reader/page/AlignedPageReader.java | 31 ++++++-----
 7 files changed, 90 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index b1ea057..00bc21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.reader.chunk.metadata;
 
+import java.util.List;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -26,14 +27,12 @@ import org.apache.iotdb.db.query.reader.chunk.DiskChunkLoader;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
 
   private final TsFileResource resource;
@@ -54,7 +53,8 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
 
   @Override
   public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata) {
-    List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList();
+
+    List<IChunkMetadata> chunkMetadataList = ((TimeseriesMetadata)timeseriesMetadata).getChunkMetadataList();
 
     setDiskChunkLoader(chunkMetadataList, resource, seriesPath, context);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 85517fe..499c328 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -169,12 +169,10 @@ public class FileLoaderUtils {
       // the order of timeSeriesMetadata list is same as subSensorList's order
       TimeSeriesMetadataCache cache = TimeSeriesMetadataCache.getInstance();
       List<String> valueMeasurementList = vectorPath.getMeasurementList();
-      ;
       Set<String> allSensors = new HashSet<>(valueMeasurementList);
       allSensors.add("");
       boolean isDebug = context.isDebug();
       String filePath = resource.getTsFilePath();
-      ;
       String deviceId = vectorPath.getDevice();
       TimeseriesMetadata timeColumn =
           cache.get(new TimeSeriesMetadataCacheKey(filePath, deviceId, ""), allSensors, isDebug);
@@ -183,18 +181,25 @@ public class FileLoaderUtils {
             new DiskChunkMetadataLoader(resource, vectorPath, context, filter));
         List<TimeseriesMetadata> valueTimeSeriesMetadataList =
             new ArrayList<>(valueMeasurementList.size());
+        // if all the queried aligned sensors does not exist, we will return null
+        boolean exist = false;
         for (String valueMeasurement : valueMeasurementList) {
           TimeseriesMetadata valueColumn =
               cache.get(
                   new TimeSeriesMetadataCacheKey(filePath, deviceId, valueMeasurement),
                   allSensors,
                   isDebug);
-          valueColumn.setChunkMetadataLoader(
-              new DiskChunkMetadataLoader(resource, vectorPath, context, filter));
+          if (valueColumn != null) {
+            valueColumn.setChunkMetadataLoader(
+                new DiskChunkMetadataLoader(resource, vectorPath, context, filter));
+            exist = true;
+          }
           valueTimeSeriesMetadataList.add(valueColumn);
         }
-        alignedTimeSeriesMetadata =
-            new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList);
+        if (exist) {
+          alignedTimeSeriesMetadata =
+              new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList);
+        }
       }
     } else { // if the tsfile is unclosed, we just get it directly from TsFileResource
       alignedTimeSeriesMetadata = (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata();
@@ -204,6 +209,7 @@ public class FileLoaderUtils {
       }
     }
 
+    // TODO Modification should be applied to each aligned sensor instead of only applying to time column
     if (alignedTimeSeriesMetadata != null) {
       List<Modification> pathModifications =
           context.getPathModifications(resource.getModFile(), vectorPath);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
index ee4f570..5b31ea4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
@@ -183,7 +183,8 @@ public class AlignedChunkMetadata implements IChunkMetadata {
   public List<Chunk> getValueChunkList() throws IOException {
     List<Chunk> valueChunkList = new ArrayList<>();
     for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
-      valueChunkList.add(chunkMetadata.getChunkLoader().loadChunk((ChunkMetadata) chunkMetadata));
+      valueChunkList.add(chunkMetadata == null ? null
+          : chunkMetadata.getChunkLoader().loadChunk((ChunkMetadata) chunkMetadata));
     }
     return valueChunkList;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
index b2a21ec..74649a3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
@@ -95,28 +95,29 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata {
       List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.loadChunkMetadataList();
       List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
       for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) {
-        valueChunkMetadataList.add(metadata.loadChunkMetadataList());
+        valueChunkMetadataList.add(metadata == null ? null : metadata.loadChunkMetadataList());
       }
 
       List<IChunkMetadata> res = new ArrayList<>();
 
       for (int i = 0; i < timeChunkMetadata.size(); i++) {
         List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+        // only at least one sensor exits, we add the AlignedChunkMetadata to the list
+        boolean exits = false;
         for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) {
-          chunkMetadataList.add(chunkMetadata.get(i));
+          IChunkMetadata v = chunkMetadata == null ? null : chunkMetadata.get(i);
+          exits = (exits || v != null);
+          chunkMetadataList.add(v);
+        }
+        if (exits) {
+          res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList));
         }
-        res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList));
       }
       return res;
     }
   }
 
   @Override
-  public List<IChunkMetadata> getChunkMetadataList() {
-    return null;
-  }
-
-  @Override
   public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) {
     timeseriesMetadata.setChunkMetadataLoader(chunkMetadataLoader);
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java
index 6bb6ec1..3a773b1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java
@@ -38,7 +38,5 @@ public interface ITimeSeriesMetadata {
 
   List<IChunkMetadata> loadChunkMetadataList() throws IOException;
 
-  List<IChunkMetadata> getChunkMetadataList();
-
   void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader);
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
index 8afd569..d04a35e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
@@ -79,10 +79,10 @@ public class AlignedChunkReader implements IChunkReader {
     List<Statistics> valueChunkStatisticsList = new ArrayList<>();
     valueChunkList.forEach(
         chunk -> {
-          this.valueChunkHeaderList.add(chunk.getHeader());
-          this.valueChunkDataBufferList.add(chunk.getData());
-          valueChunkStatisticsList.add(chunk.getChunkStatistic());
-          valueDeleteIntervalList.add(chunk.getDeleteIntervalList());
+          valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader());
+          valueChunkDataBufferList.add(chunk == null ? null : chunk.getData());
+          valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic());
+          valueDeleteIntervalList.add(chunk == null ? null : chunk.getDeleteIntervalList());
         });
     initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList);
   }
@@ -96,27 +96,41 @@ public class AlignedChunkReader implements IChunkReader {
       // deserialize a PageHeader from chunkDataBuffer
       PageHeader timePageHeader;
       List<PageHeader> valuePageHeaderList = new ArrayList<>();
-      // mask the two highest bit
+
+      boolean exits = false;
       // this chunk has only one page
       if ((timeChunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
         timePageHeader = PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkStatistics);
         for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
-          valuePageHeaderList.add(
-              PageHeader.deserializeFrom(
-                  valueChunkDataBufferList.get(i), valueChunkStatisticsList.get(i)));
+          if (valueChunkDataBufferList.get(i) != null) {
+            exits = true;
+            valuePageHeaderList.add(
+                PageHeader.deserializeFrom(
+                    valueChunkDataBufferList.get(i), valueChunkStatisticsList.get(i)));
+          } else {
+            valuePageHeaderList.add(null);
+          }
         }
       } else { // this chunk has more than one page
         timePageHeader =
             PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkHeader.getDataType());
         for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
-          valuePageHeaderList.add(
-              PageHeader.deserializeFrom(
-                  valueChunkDataBufferList.get(i), valueChunkHeaderList.get(i).getDataType()));
+          if (valueChunkDataBufferList.get(i) != null) {
+            exits = true;
+            valuePageHeaderList.add(
+                PageHeader.deserializeFrom(
+                    valueChunkDataBufferList.get(i), valueChunkHeaderList.get(i).getDataType()));
+          } else {
+            valuePageHeaderList.add(null);
+          }
         }
       }
       // if the current page satisfies
-      if (pageSatisfied(timePageHeader)) {
-        pageReaderList.add(constructPageReaderForNextPage(timePageHeader, valuePageHeaderList));
+      if (exits && pageSatisfied(timePageHeader)) {
+        AlignedPageReader alignedPageReader = constructPageReaderForNextPage(timePageHeader, valuePageHeaderList);
+        if (alignedPageReader != null) {
+          pageReaderList.add(alignedPageReader);
+        }
       } else {
         skipBytesInStreamByLength(timePageHeader, valuePageHeaderList);
       }
@@ -151,9 +165,14 @@ public class AlignedChunkReader implements IChunkReader {
     List<ByteBuffer> valuePageDataList = new ArrayList<>();
     List<TSDataType> valueDataTypeList = new ArrayList<>();
     List<Decoder> valueDecoderList = new ArrayList<>();
+    boolean exist = false;
     for (int i = 0; i < valuePageHeader.size(); i++) {
-      // if the page is satisfied, deserialize it
-      if (pageSatisfied(valuePageHeader.get(i), valueDeleteIntervalList.get(i))) {
+      if (valuePageHeader.get(i) == null) {
+        valuePageHeaderList.add(null);
+        valuePageDataList.add(null);
+        valueDataTypeList.add(null);
+        valueDecoderList.add(null);
+      } else if (pageSatisfied(valuePageHeader.get(i), valueDeleteIntervalList.get(i))) { // if the page is satisfied, deserialize it
         getPageInfo(
             valuePageHeader.get(i),
             valueChunkDataBufferList.get(i),
@@ -163,6 +182,7 @@ public class AlignedChunkReader implements IChunkReader {
         valuePageDataList.add(valuePageInfo.pageData);
         valueDataTypeList.add(valuePageInfo.dataType);
         valueDecoderList.add(valuePageInfo.decoder);
+        exist = true;
       } else { // if the page is not satisfied, just skip it
         valueChunkDataBufferList
             .get(i)
@@ -175,6 +195,9 @@ public class AlignedChunkReader implements IChunkReader {
         valueDecoderList.add(null);
       }
     }
+    if (!exist) {
+      return null;
+    }
     AlignedPageReader alignedPageReader =
         new AlignedPageReader(
             timePageHeader,
@@ -247,11 +270,13 @@ public class AlignedChunkReader implements IChunkReader {
     timeChunkDataBuffer.position(
         timeChunkDataBuffer.position() + timePageHeader.getCompressedSize());
     for (int i = 0; i < valuePageHeader.size(); i++) {
-      valueChunkDataBufferList
-          .get(i)
-          .position(
-              valueChunkDataBufferList.get(i).position()
-                  + valuePageHeader.get(i).getCompressedSize());
+      if (valuePageHeader.get(i) != null) {
+        valueChunkDataBufferList
+            .get(i)
+            .position(
+                valueChunkDataBufferList.get(i).position()
+                    + valuePageHeader.get(i).getCompressedSize());
+      }
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index b4549f9..78e432a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -55,14 +55,19 @@ public class AlignedPageReader implements IPageReader {
     isModified = timePageReader.isModified();
     valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
     for (int i = 0; i < valuePageHeaderList.size(); i++) {
-      ValuePageReader valuePageReader =
-          new ValuePageReader(
-              valuePageHeaderList.get(i),
-              valuePageDataList.get(i),
-              valueDataTypeList.get(i),
-              valueDecoderList.get(i));
-      valuePageReaderList.add(valuePageReader);
-      isModified = isModified && valuePageReader.isModified();
+      if (valuePageHeaderList.get(i) != null) {
+        ValuePageReader valuePageReader =
+            new ValuePageReader(
+                valuePageHeaderList.get(i),
+                valuePageDataList.get(i),
+                valueDataTypeList.get(i),
+                valueDecoderList.get(i));
+        valuePageReaderList.add(valuePageReader);
+        isModified = isModified && valuePageReader.isModified();
+      } else {
+        valuePageReaderList.add(null);
+      }
+
     }
     this.filter = filter;
     this.valueCount = valuePageReaderList.size();
@@ -73,14 +78,14 @@ public class AlignedPageReader implements IPageReader {
     long[] timeBatch = timePageReader.nexTimeBatch();
     // if the vector contains only one sub sensor, just return a common BatchData whose DataType is
     // same as the only one sub sensor.
-    if (valuePageReaderList.size() == 1) {
-      return valuePageReaderList.get(0).nextBatch(timeBatch, ascending, filter);
-    }
+//    if (valuePageReaderList.size() == 1) {
+//      return valuePageReaderList.get(0).nextBatch(timeBatch, ascending, filter);
+//    }
 
     // if the vector contains more than on sub sensor, the BatchData's DataType is Vector
     List<TsPrimitiveType[]> valueBatchList = new ArrayList<>(valueCount);
     for (ValuePageReader valuePageReader : valuePageReaderList) {
-      valueBatchList.add(valuePageReader.nextValueBatch(timeBatch));
+      valueBatchList.add(valuePageReader == null ? null : valuePageReader.nextValueBatch(timeBatch));
     }
     BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false);
     boolean isNull;
@@ -89,7 +94,7 @@ public class AlignedPageReader implements IPageReader {
       isNull = true;
       TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
       for (int j = 0; j < v.length; j++) {
-        v[j] = valueBatchList.get(j)[i];
+        v[j] = valueBatchList.get(j) == null ? null : valueBatchList.get(j)[i];
         if (v[j] != null) {
           isNull = false;
         }