You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/01/20 15:12:11 UTC
[iotdb] branch NewTsFileV2 updated: add
getMeasurementChunkMetadataListMapIterator method (#2530)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch NewTsFileV2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/NewTsFileV2 by this push:
new 83e86af add getMeasurementChunkMetadataListMapIterator method (#2530)
83e86af is described below
commit 83e86af3dbdf69810460bbaf00cb93760eb74e81
Author: Steve Yurong Su <st...@outlook.com>
AuthorDate: Wed Jan 20 23:11:56 2021 +0800
add getMeasurementChunkMetadataListMapIterator method (#2530)
---
.../tsfile/exception/TsFileRuntimeException.java | 2 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 92 +++++++++++++++++++++-
2 files changed, 91 insertions(+), 3 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java
index 812ea87..666d156 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/TsFileRuntimeException.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.tsfile.exception;
* This Exception is the parent class for all runtime exceptions.<br>
* This Exception extends super class {@link java.lang.RuntimeException}
*/
-public abstract class TsFileRuntimeException extends RuntimeException {
+public class TsFileRuntimeException extends RuntimeException {
private static final long serialVersionUID = 6455048223316780984L;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index e059751..be34c85 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -27,8 +27,12 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +43,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -697,8 +702,7 @@ public class TsFileSequenceReader implements AutoCloseable {
.getChildIndexEntry(name, false);
ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right);
return getMetadataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer), name,
- isDeviceLevel,
- false);
+ isDeviceLevel, false);
}
} catch (BufferOverflowException e) {
logger.error("Something error happened while deserializing MetadataIndex of file {}", file);
@@ -1188,4 +1192,88 @@ public class TsFileSequenceReader implements AutoCloseable {
public long getMaxPlanIndex() {
return maxPlanIndex;
}
+
+ public Iterator<Map<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListMapIterator(
+ String device) throws IOException {
+ readFileMetadata();
+
+ MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode,
+ device, true, true);
+
+ if (metadataIndexPair == null) {
+ return new Iterator<Map<String, List<ChunkMetadata>>>() {
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Map<String, List<ChunkMetadata>> next() {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+
+ Queue<Pair<Long, Long>> queue = new LinkedList<>();
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ collectEachLeafMeasurementNodeOffsetRange(buffer, MetadataIndexNodeType.INTERNAL_MEASUREMENT,
+ queue);
+
+ return new Iterator<Map<String, List<ChunkMetadata>>>() {
+
+ @Override
+ public boolean hasNext() {
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public Map<String, List<ChunkMetadata>> next() {
+ Pair<Long, Long> startEndPair = queue.remove();
+ Map<String, List<ChunkMetadata>> measurementChunkMetadataList = new HashMap<>();
+ try {
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ ByteBuffer nextBuffer = readData(startEndPair.left, startEndPair.right);
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(nextBuffer, true));
+ }
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ measurementChunkMetadataList
+ .computeIfAbsent(timeseriesMetadata.getMeasurementId(), m -> new ArrayList<>())
+ .addAll(timeseriesMetadata.getChunkMetadataList());
+ }
+ return measurementChunkMetadataList;
+ } catch (IOException e) {
+ throw new TsFileRuntimeException(
+ "Error occurred while reading a time series metadata block.");
+ }
+ }
+ };
+ }
+
+ private void collectEachLeafMeasurementNodeOffsetRange(ByteBuffer buffer,
+ MetadataIndexNodeType type, Queue<Pair<Long, Long>> queue)
+ throws IOException {
+ try {
+ MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
+ final int metadataIndexListSize = metadataIndexNode.getChildren().size();
+ for (int i = 0; i < metadataIndexListSize; ++i) {
+ long startOffset = metadataIndexNode.getChildren().get(i).getOffset();
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ }
+ if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ queue.add(new Pair<>(startOffset, endOffset));
+ return;
+ }
+ collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), type, queue);
+ }
+ } catch (BufferOverflowException e) {
+ logger.error("Error occurred while collecting offset ranges of measurement nodes of file {}",
+ file);
+ throw e;
+ }
+ }
}