You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/12/30 07:11:55 UTC
[iotdb] 01/02: Add iterator to return device list in dictionary order
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch jira_2228
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 605bc4b9978f10fcd19ba33fb7d3970dbf9bc863
Author: Zesong Sun <v-...@microsoft.com>
AuthorDate: Thu Dec 30 15:07:38 2021 +0800
Add iterator to return device list in dictionary order
---
.../iotdb/tsfile/read/TsFileSequenceReader.java | 82 ++++++++++++++++++----
.../iotdb/tsfile/read/GetAllDevicesTest.java | 2 +-
.../apache/iotdb/tsfile/utils/FileGenerator.java | 32 ++++++++-
.../tsfile/write/MetadataIndexConstructorTest.java | 25 +++----
4 files changed, 107 insertions(+), 34 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index a8fd34a..48a04c5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -591,8 +591,6 @@ public class TsFileSequenceReader implements AutoCloseable {
private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException {
List<String> deviceList = new ArrayList<>();
- int metadataIndexListSize = metadataIndexNode.getChildren().size();
-
// if metadataIndexNode is LEAF_DEVICE, put all devices in node entry into the list
if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
deviceList.addAll(
@@ -602,6 +600,7 @@ public class TsFileSequenceReader implements AutoCloseable {
return deviceList;
}
+ int metadataIndexListSize = metadataIndexNode.getChildren().size();
for (int i = 0; i < metadataIndexListSize; i++) {
long endOffset = metadataIndexNode.getEndOffset();
if (i != metadataIndexListSize - 1) {
@@ -609,21 +608,77 @@ public class TsFileSequenceReader implements AutoCloseable {
}
ByteBuffer buffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
MetadataIndexNode node = MetadataIndexNode.deserializeFrom(buffer);
- if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
- // if node in next level is LEAF_DEVICE, put all devices in node entry into the list
- deviceList.addAll(
- node.getChildren().stream()
- .map(MetadataIndexEntry::getName)
- .collect(Collectors.toList()));
- } else {
- // keep traversing
- deviceList.addAll(getAllDevices(node));
- }
+ deviceList.addAll(getAllDevices(node));
}
return deviceList;
}
/**
+ * @return an iterator of "device, isAligned" list, in which names of devices are ordered in
+ * dictionary order, and isAligned represents whether the device is aligned
+ */
+ public Iterator<Pair<String, Boolean>> getAllDevicesIteratorWithIsAligned() throws IOException {
+ readFileMetadata();
+
+ MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
+ Queue<Pair<String, Pair<Long, Long>>> queue = new LinkedList<>();
+ getAllDevicesWithIsAligned(metadataIndexNode, queue);
+
+ return new Iterator<Pair<String, Boolean>>() {
+ @Override
+ public boolean hasNext() {
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public Pair<String, Boolean> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
+ List<Pair<String, Boolean>> devices = new ArrayList<>();
+ try {
+ MetadataIndexNode measurementNode =
+ MetadataIndexNode.deserializeFrom(
+ readData(startEndPair.right.left, startEndPair.right.right));
+ // if tryToGetFirstTimeseriesMetadata(node) returns null, the device is not aligned
+ boolean isAligned = tryToGetFirstTimeseriesMetadata(measurementNode) != null;
+ return new Pair<>(startEndPair.left, isAligned);
+ } catch (IOException e) {
+ throw new TsFileRuntimeException(
+ "Error occurred while reading a time series metadata block.");
+ }
+ }
+ };
+ }
+
+ private void getAllDevicesWithIsAligned(
+ MetadataIndexNode metadataIndexNode, Queue<Pair<String, Pair<Long, Long>>> queue)
+ throws IOException {
+ try {
+ int metadataIndexListSize = metadataIndexNode.getChildren().size();
+
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ MetadataIndexEntry entry = metadataIndexNode.getChildren().get(i);
+ long startOffset = entry.getOffset();
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ }
+ if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
+ queue.add(new Pair<>(entry.getName(), new Pair<>(startOffset, endOffset)));
+ continue;
+ }
+ ByteBuffer nextBuffer = readData(startOffset, endOffset);
+ getAllDevicesWithIsAligned(MetadataIndexNode.deserializeFrom(nextBuffer), queue);
+ }
+ } catch (BufferOverflowException e) {
+ logger.error("Something error happened while getting all devices of file {}", file);
+ throw e;
+ }
+ }
+
+ /**
* read all ChunkMetaDatas of given device
*
* @param device name
@@ -773,8 +828,7 @@ public class TsFileSequenceReader implements AutoCloseable {
readData(
measurementNode.getChildren().get(0).getOffset(), measurementNode.getEndOffset());
}
- TimeseriesMetadata firstTimeseriesMetadata = TimeseriesMetadata.deserializeFrom(buffer, true);
- return firstTimeseriesMetadata;
+ return TimeseriesMetadata.deserializeFrom(buffer, true);
} else if (measurementNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) {
ByteBuffer buffer =
readData(
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java
index 54fceae..a7a3cea 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java
@@ -76,7 +76,7 @@ public class GetAllDevicesTest {
List<String> devices = fileReader.getAllDevices();
Assert.assertEquals(deviceNum, devices.size());
for (int i = 0; i < deviceNum; i++) {
- Assert.assertTrue(devices.contains("d" + i));
+ Assert.assertEquals("d" + FileGenerator.generateIndexString(i, deviceNum), devices.get(i));
}
FileGenerator.after();
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java
index 517f252..c46dae9 100755
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java
@@ -223,7 +223,15 @@ public class FileGenerator {
long startTime = 1480562618000L;
for (int i = 0; i < deviceNum; i++) {
for (int j = 0; j < measurementNum; j++) {
- String d = "d" + i + "," + startTime + ",s" + j + "," + 1;
+ String d =
+ "d"
+ + generateIndexString(i, deviceNum)
+ + ","
+ + startTime
+ + ",s"
+ + generateIndexString(j, measurementNum)
+ + ","
+ + 1;
fw.write(d + "\r\n");
}
}
@@ -294,9 +302,11 @@ public class FileGenerator {
for (int i = 0; i < deviceNum; i++) {
for (int j = 0; j < measurementNum; j++) {
schema.registerTimeseries(
- new Path("d" + i),
+ new Path("d" + generateIndexString(i, deviceNum)),
new UnaryMeasurementSchema(
- "s" + j, TSDataType.INT32, TSEncoding.valueOf(config.getValueEncoder())));
+ "s" + generateIndexString(j, measurementNum),
+ TSDataType.INT32,
+ TSEncoding.valueOf(config.getValueEncoder())));
}
}
}
@@ -331,4 +341,20 @@ public class FileGenerator {
return null;
}
}
+
+ /**
+ * generate curIndex string, use "0" on left to make sure align
+ *
+ * @param curIndex current index
+ * @param maxIndex max index
+ * @return curIndex's string
+ */
+ public static String generateIndexString(int curIndex, int maxIndex) {
+ StringBuilder res = new StringBuilder(String.valueOf(curIndex));
+ String target = String.valueOf(maxIndex);
+ while (res.length() < target.length()) {
+ res.insert(0, "0");
+ }
+ return res.toString();
+ }
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
index cebf730..deca37d 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.MeasurementGroup;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -56,6 +57,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.tsfile.utils.FileGenerator.generateIndexString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -223,6 +225,13 @@ public class MetadataIndexConstructorTest {
}
// 4.2 make sure timeseries in order
try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ Iterator<Pair<String, Boolean>> iterator = reader.getAllDevicesIteratorWithIsAligned();
+ while (iterator.hasNext()) {
+ for (String correctDevice : correctDevices) {
+ assertEquals(correctDevice, iterator.next().left);
+ }
+ }
+
Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
reader.getAllTimeseriesMetadata();
for (int j = 0; j < actualDevices.size(); j++) {
@@ -481,20 +490,4 @@ public class MetadataIndexConstructorTest {
fail(e.getMessage());
}
}
-
- /**
- * generate curIndex string, use "0" on left to make sure align
- *
- * @param curIndex current index
- * @param maxIndex max index
- * @return curIndex's string
- */
- private String generateIndexString(int curIndex, int maxIndex) {
- StringBuilder res = new StringBuilder(String.valueOf(curIndex));
- String target = String.valueOf(maxIndex);
- while (res.length() < target.length()) {
- res.insert(0, "0");
- }
- return res.toString();
- }
}