You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/01/20 15:12:11 UTC

[iotdb] branch NewTsFileV2 updated: add getMeasurementChunkMetadataListMapIterator method (#2530)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch NewTsFileV2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/NewTsFileV2 by this push:
     new 83e86af  add getMeasurementChunkMetadataListMapIterator method (#2530)
83e86af is described below

commit 83e86af3dbdf69810460bbaf00cb93760eb74e81
Author: Steve Yurong Su <st...@outlook.com>
AuthorDate: Wed Jan 20 23:11:56 2021 +0800

    add getMeasurementChunkMetadataListMapIterator method (#2530)
---
 .../tsfile/exception/TsFileRuntimeException.java   |  2 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 92 +++++++++++++++++++++-
 2 files changed, 91 insertions(+), 3 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java
index 812ea87..666d156 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.tsfile.exception;
  * This Exception is the parent class for all runtime exceptions.<br>
  * This Exception extends super class {@link java.lang.RuntimeException}
  */
-public abstract class TsFileRuntimeException extends RuntimeException {
+public class TsFileRuntimeException extends RuntimeException {
 
   private static final long serialVersionUID = 6455048223316780984L;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index e059751..be34c85 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -27,8 +27,12 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +43,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
 import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
 import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -697,8 +702,7 @@ public class TsFileSequenceReader implements AutoCloseable {
             .getChildIndexEntry(name, false);
         ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right);
         return getMetadataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer), name,
-            isDeviceLevel,
-            false);
+            isDeviceLevel, false);
       }
     } catch (BufferOverflowException e) {
       logger.error("Something error happened while deserializing MetadataIndex of file {}", file);
@@ -1188,4 +1192,88 @@ public class TsFileSequenceReader implements AutoCloseable {
   public long getMaxPlanIndex() {
     return maxPlanIndex;
   }
+
+  public Iterator<Map<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListMapIterator(
+      String device) throws IOException {
+    readFileMetadata();
+
+    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
+    Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode,
+        device, true, true);
+
+    if (metadataIndexPair == null) {
+      return new Iterator<Map<String, List<ChunkMetadata>>>() {
+
+        @Override
+        public boolean hasNext() {
+          return false;
+        }
+
+        @Override
+        public Map<String, List<ChunkMetadata>> next() {
+          throw new NoSuchElementException();
+        }
+      };
+    }
+
+    Queue<Pair<Long, Long>> queue = new LinkedList<>();
+    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+    collectEachLeafMeasurementNodeOffsetRange(buffer, MetadataIndexNodeType.INTERNAL_MEASUREMENT,
+        queue);
+
+    return new Iterator<Map<String, List<ChunkMetadata>>>() {
+
+      @Override
+      public boolean hasNext() {
+        return !queue.isEmpty();
+      }
+
+      @Override
+      public Map<String, List<ChunkMetadata>> next() {
+        Pair<Long, Long> startEndPair = queue.remove();
+        Map<String, List<ChunkMetadata>> measurementChunkMetadataList = new HashMap<>();
+        try {
+          List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+          ByteBuffer nextBuffer = readData(startEndPair.left, startEndPair.right);
+          while (buffer.hasRemaining()) {
+            timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(nextBuffer, true));
+          }
+          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+            measurementChunkMetadataList
+                .computeIfAbsent(timeseriesMetadata.getMeasurementId(), m -> new ArrayList<>())
+                .addAll(timeseriesMetadata.getChunkMetadataList());
+          }
+          return measurementChunkMetadataList;
+        } catch (IOException e) {
+          throw new TsFileRuntimeException(
+              "Error occurred while reading a time series metadata block.");
+        }
+      }
+    };
+  }
+
+  private void collectEachLeafMeasurementNodeOffsetRange(ByteBuffer buffer,
+      MetadataIndexNodeType type, Queue<Pair<Long, Long>> queue)
+      throws IOException {
+    try {
+      MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
+      final int metadataIndexListSize = metadataIndexNode.getChildren().size();
+      for (int i = 0; i < metadataIndexListSize; ++i) {
+        long startOffset = metadataIndexNode.getChildren().get(i).getOffset();
+        long endOffset = metadataIndexNode.getEndOffset();
+        if (i != metadataIndexListSize - 1) {
+          endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+        }
+        if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+          queue.add(new Pair<>(startOffset, endOffset));
+          return;
+        }
+        collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), type, queue);
+      }
+    } catch (BufferOverflowException e) {
+      logger.error("Error occurred while collecting offset ranges of measurement nodes of file {}",
+          file);
+      throw e;
+    }
+  }
 }