You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/12/09 14:57:40 UTC
[iotdb] branch experimental/index updated: finish B+ Tree
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch experimental/index
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/experimental/index by this push:
new a532b7a finish B+ Tree
a532b7a is described below
commit a532b7ade4269090a6f6ccf1d402ae5951221f71
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Thu Dec 9 22:56:50 2021 +0800
finish B+ Tree
---
.../src/main/java/org/apache/iotdb/Xianyi.java | 161 ++++++++
example/tsfile/pom.xml | 2 +-
.../org/apache/iotdb/tsfile/TestPreprocess.java | 123 ++++++
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 3 +-
.../iotdb/tsfile/test1835/TsFileRawReadV2.java | 8 +-
.../TsFileAggregation.java} | 48 ++-
.../TsFileRawRead.java} | 26 +-
.../apache/iotdb/tsfile/test1929/TsFileWrite.java | 151 ++++++++
...dataIndexNode.java => MetadataIndexBucket.java} | 14 +-
.../file/metadata/MetadataIndexBucketEntry.java | 70 ++++
.../file/metadata/MetadataIndexConstructorV2.java | 102 ++---
.../tsfile/file/metadata/MetadataIndexNode.java | 4 +
...dataIndexNode.java => MetadataIndexNodeV2.java} | 38 +-
.../iotdb/tsfile/file/metadata/TsFileMetadata.java | 46 +++
.../apache/iotdb/tsfile/read/ReadOnlyTsFile.java | 11 +
.../iotdb/tsfile/read/TsFileSequenceReader.java | 210 ++++++++--
.../tsfile/read/controller/IMetadataQuerier.java | 2 +
.../read/controller/MetadataQuerierByFileImpl.java | 72 ++++
.../query/executor/ExecutorWithTimeGenerator.java | 6 +
.../tsfile/read/query/executor/QueryExecutor.java | 2 +
.../tsfile/read/query/executor/TsFileExecutor.java | 49 ++-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 20 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 341 ++++++++++-------
.../write/MetadataIndexConstructorV2Test.java | 424 +++++++++++++++++++++
24 files changed, 1637 insertions(+), 296 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/Xianyi.java b/example/session/src/main/java/org/apache/iotdb/Xianyi.java
new file mode 100644
index 0000000..23d76a1
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/Xianyi.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@SuppressWarnings("squid:S106")
+public class Xianyi {
+
+ private static Session session;
+ public static final String prefix = "root.SC.JZG.00.";
+ public static final String device = "BHE"; // BHE, BHN, BHZ
+ public static final String measurement = "D";
+ public static final String path = prefix + device + "." + measurement;
+
+ private static final String LOCAL_HOST = "127.0.0.1";
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException {
+ session =
+ new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build();
+ session.open(false);
+
+ // set session fetchSize
+ session.setFetchSize(10000);
+
+ try {
+ session.setStorageGroup("root.SC");
+ } catch (StatementExecutionException e) {
+ if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+ throw e;
+ }
+ }
+
+ if (!session.checkTimeseriesExists(path)) {
+ session.createTimeseries(path, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.GZIP);
+ }
+ try {
+ insertTablet();
+ } catch (ParseException | IOException e) {
+ e.printStackTrace();
+ }
+
+ session.close();
+ }
+
+ /**
+ * insert the data of a device. For each timestamp, the number of measurements is the same.
+ *
+ * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
+ */
+ private static void insertTablet()
+ throws IoTDBConnectionException, StatementExecutionException, ParseException, IOException {
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new UnaryMeasurementSchema(measurement, TSDataType.INT32));
+
+ Tablet tablet = new Tablet(prefix + device, schemaList, 100);
+
+ for (int fileId = 1; fileId <= 30; fileId++) {
+ File file =
+ new File(
+ "/Users/samperson1997/git/iotdb/data/2017/SC/JZG/"
+ + measurement
+ + ".D1/SC.JZG.00."
+ + measurement
+ + ".D.2017."
+ + generateIndexString(fileId)
+ + ".txt");
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+
+ String tempString;
+ int idx = 0;
+ long startTime = 0L;
+
+ while ((tempString = reader.readLine()) != null) {
+ tempString = tempString.trim();
+ if (tempString.startsWith("TraceID")
+ || tempString.startsWith("DATA")
+ || tempString.startsWith("Record")
+ || tempString.startsWith("RECORD")
+ || tempString.startsWith("Start")) {
+ continue;
+ }
+ if (tempString.startsWith("Segment")) {
+ String dateStr = tempString.split(" ")[1];
+ Date date = sdf.parse(dateStr.substring(0, 10) + " " + dateStr.substring(11, 19));
+ startTime = date.getTime();
+ idx = 0;
+ continue;
+ }
+ String[] values = tempString.split(" ");
+ for (String s : values) {
+ if (s.equals("")) {
+ continue;
+ }
+ long time = startTime + idx * 10;
+ int value = Integer.parseInt(s);
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, time);
+ tablet.addValue(measurement, rowIndex, value);
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertTablet(tablet, true);
+ tablet.reset();
+ }
+ idx++;
+ }
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ }
+
+ private static String generateIndexString(int curIndex) {
+ StringBuilder res = new StringBuilder(String.valueOf(curIndex));
+ String target = String.valueOf(100);
+ while (res.length() < target.length()) {
+ res.insert(0, "0");
+ }
+ return res.toString();
+ }
+}
diff --git a/example/tsfile/pom.xml b/example/tsfile/pom.xml
index 45a9e33..2db4040 100644
--- a/example/tsfile/pom.xml
+++ b/example/tsfile/pom.xml
@@ -51,7 +51,7 @@
<configuration>
<archive>
<manifest>
- <mainClass>org.apache.iotdb.tsfile.test1835.TsFileAggregationV2</mainClass>
+ <mainClass>org.apache.iotdb.tsfile.test1929.TsFileAggregation</mainClass>
</manifest>
</archive>
<descriptorRefs>
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TestPreprocess.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TestPreprocess.java
new file mode 100644
index 0000000..ee34000
--- /dev/null
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TestPreprocess.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile;
+
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class TestPreprocess {
+
+ public static final String deviceId = "root.SC.JZG.00";
+ public static final String measurementId = "BHZ";
+
+ public static void main(String[] args) {
+
+ try {
+ File outputFile =
+ new File(
+ "/Users/samperson1997/git/iotdb/data/data/sequence/root.SC/0/0/"
+ + measurementId
+ + ".tsfile");
+ TsFileWriter tsFileWriter = new TsFileWriter(outputFile);
+ tsFileWriter.registerTimeseries(
+ new Path(deviceId, measurementId),
+ new UnaryMeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN));
+
+ for (int fileId = 1; fileId <= 30; fileId++) {
+ File file =
+ new File(
+ "/Users/samperson1997/git/iotdb/data/2017/SC/JZG/"
+ + measurementId
+ + ".D1/SC.JZG.00."
+ + measurementId
+ + ".D.2017."
+ + generateIndexString(fileId)
+ + ".txt");
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+
+ String tempString;
+ int idx = 0;
+ long startTime = 0L;
+
+ while ((tempString = reader.readLine()) != null) {
+ tempString = tempString.trim();
+ if (tempString.startsWith("TraceID")
+ || tempString.startsWith("DATA")
+ || tempString.startsWith("Record")
+ || tempString.startsWith("RECORD")
+ || tempString.startsWith("Start")) {
+ continue;
+ }
+ if (tempString.startsWith("Segment")) {
+ String dateStr = tempString.split(" ")[1];
+ Date date = sdf.parse(dateStr.substring(0, 10) + " " + dateStr.substring(11, 19));
+ startTime = date.getTime();
+ idx = 0;
+ continue;
+ }
+ String[] values = tempString.split(" ");
+ for (String s : values) {
+ if (s.equals("")) {
+ continue;
+ }
+ long time = startTime + idx * 10;
+ int value = Integer.parseInt(s);
+ TSRecord tsRecord = new TSRecord(time, deviceId);
+ DataPoint dPoint1 = new IntDataPoint(measurementId, value);
+ tsRecord.addTuple(dPoint1);
+ tsFileWriter.write(tsRecord);
+ idx++;
+ }
+ }
+ reader.close();
+ }
+
+ tsFileWriter.close();
+ } catch (IOException | ParseException | WriteProcessException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static String generateIndexString(int curIndex) {
+ StringBuilder res = new StringBuilder(String.valueOf(curIndex));
+ String target = String.valueOf(100);
+ while (res.length() < target.length()) {
+ res.insert(0, "0");
+ }
+ return res.toString();
+ }
+}
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 788100b..b0c181f 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -46,8 +46,7 @@ public class TsFileSequenceRead {
"squid:S106"
}) // Suppress high Cognitive Complexity and Standard outputs warning
public static void main(String[] args) throws IOException {
- String filename =
- "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/0/1832/test5.tsfile";
+ String filename = "/Users/samperson1997/git/iotdb/data/data/sequence/root.SC/0/0/BHN.tsfile";
if (args.length >= 1) {
filename = args[0];
}
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java
index 96b2d59..587f86a 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java
@@ -58,7 +58,7 @@ public class TsFileRawReadV2 {
cl = parser.parse(opts, args);
deviceNum = Integer.parseInt(cl.getOptionValue("d"));
sensorNum = Integer.parseInt(cl.getOptionValue("m"));
- fileNum = Integer.parseInt(cl.getOptionValue("f"));
+ fileNum = 1; // Integer.parseInt(cl.getOptionValue("f"));
} catch (Exception e) {
e.printStackTrace();
}
@@ -77,16 +77,16 @@ public class TsFileRawReadV2 {
// raw data query
try (TsFileSequenceReader reader = new TsFileSequenceReader(path, false);
- ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
+ ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader, 0)) {
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path(DEVICE1, "sensor_1"));
QueryExpression queryExpression = QueryExpression.create(paths, null);
- QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+ QueryDataSet queryDataSet = readTsFile.query(queryExpression, 0);
while (queryDataSet.hasNext()) {
- queryDataSet.next();
+ System.out.println(queryDataSet.next());
}
}
}
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileAggregation.java
similarity index 66%
copy from example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java
copy to example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileAggregation.java
index 96b2d59..c9ef669 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileAggregation.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.test1835;
+package org.apache.iotdb.tsfile.test1929;
-import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
@@ -31,15 +31,17 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import java.io.IOException;
-import java.util.ArrayList;
-public class TsFileRawReadV2 {
+public class TsFileAggregation {
private static final String DEVICE1 = "device_1";
public static int deviceNum;
public static int sensorNum;
+ public static int treeType; // 0=Zesong Tree, 1=B+ Tree
public static int fileNum;
+ private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
public static void main(String[] args) throws IOException {
Options opts = new Options();
Option deviceNumOption =
@@ -51,6 +53,12 @@ public class TsFileRawReadV2 {
Option fileNumOption =
OptionBuilder.withArgName("args").withLongOpt("fileNum").hasArg().create("f");
opts.addOption(fileNumOption);
+ Option treeTypeOption =
+ OptionBuilder.withArgName("args").withLongOpt("treeType").hasArg().create("t");
+ opts.addOption(treeTypeOption);
+ Option degreeOption =
+ OptionBuilder.withArgName("args").withLongOpt("degree").hasArg().create("c");
+ opts.addOption(degreeOption);
BasicParser parser = new BasicParser();
CommandLine cl;
@@ -59,6 +67,8 @@ public class TsFileRawReadV2 {
deviceNum = Integer.parseInt(cl.getOptionValue("d"));
sensorNum = Integer.parseInt(cl.getOptionValue("m"));
fileNum = Integer.parseInt(cl.getOptionValue("f"));
+ treeType = 1; // Integer.parseInt(cl.getOptionValue("t"));
+ config.setMaxDegreeOfIndexNode(Integer.parseInt(cl.getOptionValue("c")));
} catch (Exception e) {
e.printStackTrace();
}
@@ -67,7 +77,9 @@ public class TsFileRawReadV2 {
for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
// file path
String path =
- "/data/szs/data/data/sequence/root.sg/0/"
+ "/data/szs/data/data/sequence/root.b/"
+ + config.getMaxDegreeOfIndexNode()
+ + "/"
+ deviceNum
+ "."
+ sensorNum
@@ -75,23 +87,19 @@ public class TsFileRawReadV2 {
+ fileIndex
+ ".tsfile";
- // raw data query
- try (TsFileSequenceReader reader = new TsFileSequenceReader(path, false);
- ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
-
- ArrayList<Path> paths = new ArrayList<>();
- paths.add(new Path(DEVICE1, "sensor_1"));
-
- QueryExpression queryExpression = QueryExpression.create(paths, null);
-
- QueryDataSet queryDataSet = readTsFile.query(queryExpression);
- while (queryDataSet.hasNext()) {
- queryDataSet.next();
+ // aggregation query
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(path, false)) {
+ Path seriesPath = new Path(DEVICE1, "sensor_1");
+ TimeseriesMetadata timeseriesMetadata = null;
+ if (treeType == 0) {
+ timeseriesMetadata = reader.readTimeseriesMetadataV4(seriesPath, false);
+ } else if (treeType == 1) {
+ timeseriesMetadata = reader.readTimeseriesMetadataV5(seriesPath, false);
}
+ long count = timeseriesMetadata.getStatistics().getCount();
}
}
long totalTime = (System.nanoTime() - totalStartTime) / 1000_000;
- System.out.println("Total raw read cost time: " + totalTime + "ms");
System.out.println("Average cost time: " + (double) totalTime / (double) fileNum + "ms");
}
}
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileRawRead.java
similarity index 77%
copy from example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java
copy to example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileRawRead.java
index 96b2d59..2093536 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileRawRead.java
@@ -16,8 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.test1835;
+package org.apache.iotdb.tsfile.test1929;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -33,13 +35,16 @@ import org.apache.commons.cli.Options;
import java.io.IOException;
import java.util.ArrayList;
-public class TsFileRawReadV2 {
+public class TsFileRawRead {
private static final String DEVICE1 = "device_1";
public static int deviceNum;
public static int sensorNum;
+ public static int treeType; // 0=Zesong Tree, 1=B+ Tree
public static int fileNum;
+ private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
public static void main(String[] args) throws IOException {
Options opts = new Options();
Option deviceNumOption =
@@ -51,6 +56,12 @@ public class TsFileRawReadV2 {
Option fileNumOption =
OptionBuilder.withArgName("args").withLongOpt("fileNum").hasArg().create("f");
opts.addOption(fileNumOption);
+ Option treeTypeOption =
+ OptionBuilder.withArgName("args").withLongOpt("treeType").hasArg().create("t");
+ opts.addOption(treeTypeOption);
+ Option degreeOption =
+ OptionBuilder.withArgName("args").withLongOpt("degree").hasArg().create("c");
+ opts.addOption(degreeOption);
BasicParser parser = new BasicParser();
CommandLine cl;
@@ -59,6 +70,8 @@ public class TsFileRawReadV2 {
deviceNum = Integer.parseInt(cl.getOptionValue("d"));
sensorNum = Integer.parseInt(cl.getOptionValue("m"));
fileNum = Integer.parseInt(cl.getOptionValue("f"));
+ treeType = 1; // Integer.parseInt(cl.getOptionValue("t"));
+ config.setMaxDegreeOfIndexNode(Integer.parseInt(cl.getOptionValue("c")));
} catch (Exception e) {
e.printStackTrace();
}
@@ -67,7 +80,9 @@ public class TsFileRawReadV2 {
for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
// file path
String path =
- "/data/szs/data/data/sequence/root.sg/0/"
+ "/data/szs/data/data/sequence/root.b/"
+ + config.getMaxDegreeOfIndexNode()
+ + "/"
+ deviceNum
+ "."
+ sensorNum
@@ -77,21 +92,20 @@ public class TsFileRawReadV2 {
// raw data query
try (TsFileSequenceReader reader = new TsFileSequenceReader(path, false);
- ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
+ ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader, treeType)) {
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path(DEVICE1, "sensor_1"));
QueryExpression queryExpression = QueryExpression.create(paths, null);
- QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+ QueryDataSet queryDataSet = readTsFile.query(queryExpression, treeType);
while (queryDataSet.hasNext()) {
queryDataSet.next();
}
}
}
long totalTime = (System.nanoTime() - totalStartTime) / 1000_000;
- System.out.println("Total raw read cost time: " + totalTime + "ms");
System.out.println("Average cost time: " + (double) totalTime / (double) fileNum + "ms");
}
}
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileWrite.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileWrite.java
new file mode 100644
index 0000000..2b9ac8c
--- /dev/null
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1929/TsFileWrite.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.test1929;
+
+import org.apache.iotdb.tsfile.Constant;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+import java.io.File;
+import java.util.Random;
+
+/**
+ * An example of writing data with TSRecord to TsFile It uses the interface: public void
+ * addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException
+ */
+public class TsFileWrite {
+ public static int deviceNum;
+ public static int sensorNum;
+ public static int fileNum;
+ public static int treeType; // 0=Zesong Tree, 1=B+ Tree
+ public static int pointNum = 100;
+
+ private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
+ public static void main(String[] args) {
+ Options opts = new Options();
+ Option deviceNumOption =
+ OptionBuilder.withArgName("args").withLongOpt("deviceNum").hasArg().create("d");
+ opts.addOption(deviceNumOption);
+ Option sensorNumOption =
+ OptionBuilder.withArgName("args").withLongOpt("sensorNum").hasArg().create("m");
+ opts.addOption(sensorNumOption);
+ Option fileNumOption =
+ OptionBuilder.withArgName("args").withLongOpt("fileNum").hasArg().create("f");
+ opts.addOption(fileNumOption);
+ Option treeTypeOption =
+ OptionBuilder.withArgName("args").withLongOpt("treeType").hasArg().create("t");
+ opts.addOption(treeTypeOption);
+ Option degreeOption =
+ OptionBuilder.withArgName("args").withLongOpt("degree").hasArg().create("c");
+ opts.addOption(degreeOption);
+
+ int originMaxDegreeOfIndexNode = config.getMaxDegreeOfIndexNode();
+
+ BasicParser parser = new BasicParser();
+ CommandLine cl;
+ try {
+ cl = parser.parse(opts, args);
+ deviceNum = Integer.parseInt(cl.getOptionValue("d"));
+ sensorNum = Integer.parseInt(cl.getOptionValue("m"));
+ fileNum = Integer.parseInt(cl.getOptionValue("f"));
+ treeType = 1; // Integer.parseInt(cl.getOptionValue("t"));
+ config.setMaxDegreeOfIndexNode(Integer.parseInt(cl.getOptionValue("c")));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
+ try {
+ String path =
+ "/data/szs/data/data/sequence/root.b/"
+ + config.getMaxDegreeOfIndexNode()
+ + "/"
+ + deviceNum
+ + "."
+ + sensorNum
+ + "/test"
+ + fileIndex
+ + ".tsfile";
+ File f = FSFactoryProducer.getFSFactory().getFile(path);
+ if (f.exists()) {
+ f.delete();
+ }
+
+ try {
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
+ for (int i = 1; i <= deviceNum; i++) {
+ for (int j = 1; j <= sensorNum; j++) {
+ Path path1 = new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_ + j);
+ tsFileWriter.registerTimeseries(
+ path1,
+ new UnaryMeasurementSchema(
+ Constant.SENSOR_ + j, TSDataType.INT64, TSEncoding.RLE));
+ }
+ }
+ // construct TSRecord
+ for (int j = 1; j <= deviceNum; j++) {
+ for (int i = 1; i <= pointNum; i++) {
+ TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + j);
+ for (int t = 1; t <= sensorNum; t++) {
+ DataPoint dPoint1 =
+ new LongDataPoint(Constant.SENSOR_ + t, new Random().nextLong());
+ tsRecord.addTuple(dPoint1);
+ }
+ // write TSRecord
+ tsFileWriter.write(tsRecord);
+ if (i % 100 == 0) {
+ tsFileWriter.flushAllChunkGroups();
+ }
+ }
+ }
+ if (treeType == 0) {
+ tsFileWriter.close();
+ } else if (treeType == 1) {
+ tsFileWriter.closeBTree();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.out.println(e.getMessage());
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.out.println(e.getMessage());
+ }
+ }
+ config.setMaxDegreeOfIndexNode(originMaxDegreeOfIndexNode);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexBucket.java
similarity index 91%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexBucket.java
index 3f6f633..fc78087 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexBucket.java
@@ -26,28 +26,32 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class MetadataIndexNode {
+public class MetadataIndexBucket {
private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private final List<MetadataIndexEntry> children;
private long endOffset;
+ private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
/** type of the child node at offset */
private final MetadataIndexNodeType nodeType;
- public MetadataIndexNode(MetadataIndexNodeType nodeType) {
+ public MetadataIndexBucket(MetadataIndexNodeType nodeType) {
children = new ArrayList<>();
endOffset = -1L;
this.nodeType = nodeType;
}
- public MetadataIndexNode(
+ public MetadataIndexBucket(
List<MetadataIndexEntry> children, long endOffset, MetadataIndexNodeType nodeType) {
this.children = children;
this.endOffset = endOffset;
@@ -96,7 +100,7 @@ public class MetadataIndexNode {
return byteLen;
}
- public static MetadataIndexNode deserializeFrom(ByteBuffer buffer) {
+ public static MetadataIndexBucket deserializeFrom(ByteBuffer buffer) {
List<MetadataIndexEntry> children = new ArrayList<>();
int size = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
for (int i = 0; i < size; i++) {
@@ -105,7 +109,7 @@ public class MetadataIndexNode {
long offset = ReadWriteIOUtils.readLong(buffer);
MetadataIndexNodeType nodeType =
MetadataIndexNodeType.deserialize(ReadWriteIOUtils.readByte(buffer));
- return new MetadataIndexNode(children, offset, nodeType);
+ return new MetadataIndexBucket(children, offset, nodeType);
}
public Pair<MetadataIndexEntry, Long> getChildIndexEntry(String key, boolean exactSearch) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexBucketEntry.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexBucketEntry.java
new file mode 100644
index 0000000..09282ae
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexBucketEntry.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.file.metadata;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class MetadataIndexBucketEntry {
+
+ private String path;
+ private long offset;
+
+ public MetadataIndexBucketEntry(String path, long offset) {
+ this.path = path;
+ this.offset = offset;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public String toString() {
+ return "<" + path + "," + offset + ">";
+ }
+
+ public int serializeTo(OutputStream outputStream) throws IOException {
+ int byteLen = 0;
+ byteLen += ReadWriteIOUtils.writeVar(path, outputStream);
+ byteLen += ReadWriteIOUtils.write(offset, outputStream);
+ return byteLen;
+ }
+
+ public static MetadataIndexBucketEntry deserializeFrom(ByteBuffer buffer) {
+ String name = ReadWriteIOUtils.readVarIntString(buffer);
+ long offset = ReadWriteIOUtils.readLong(buffer);
+ return new MetadataIndexBucketEntry(name, offset);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructorV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructorV2.java
index 8b9ea6a..970475e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructorV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructorV2.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.tsfile.file.metadata;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
import java.io.IOException;
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
-import java.util.TreeMap;
public class MetadataIndexConstructorV2 {
@@ -44,78 +43,53 @@ public class MetadataIndexConstructorV2 {
* Construct metadata index tree
*
* @param deviceTimeseriesMetadataMap device => TimeseriesMetadata list
- * @param out tsfile output
+ * @param tsFileOutput tsfile output
+ * @param metadataIndexOutput metadataIndex output
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- public static MetadataIndexNode constructMetadataIndex(
- Map<String, List<TimeseriesMetadataV2>> deviceTimeseriesMetadataMap, TsFileOutput out)
+ public static MetadataIndexNodeV2 constructMetadataIndex(
+ Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap,
+ TsFileOutput tsFileOutput,
+ TsFileOutput metadataIndexOutput)
throws IOException {
- Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
-
+ Queue<MetadataIndexNodeV2> metadataIndexQueue = new ArrayDeque<>();
+ MetadataIndexNodeV2 currentIndexNode = new MetadataIndexNodeV2();
+ currentIndexNode.setLeaf(true);
+ int serializedTimeseriesMetadataNum = 0;
+ boolean isNewDevice;
// for timeseriesMetadata of each device
- for (Entry<String, List<TimeseriesMetadataV2>> entry : deviceTimeseriesMetadataMap.entrySet()) {
+ for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
if (entry.getValue().isEmpty()) {
continue;
}
- Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
- TimeseriesMetadataV2 timeseriesMetadata;
- MetadataIndexNode currentIndexNode =
- new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
- int serializedTimeseriesMetadataNum = 0;
+ isNewDevice = true;
+ TimeseriesMetadata timeseriesMetadata;
for (int i = 0; i < entry.getValue().size(); i++) {
timeseriesMetadata = entry.getValue().get(i);
if (serializedTimeseriesMetadataNum == 0
- || serializedTimeseriesMetadataNum >= config.getMaxDegreeOfIndexNode()) {
+ || serializedTimeseriesMetadataNum >= config.getMaxDegreeOfIndexNode()
+ || isNewDevice) {
if (currentIndexNode.isFull()) {
- addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
- currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexQueue, tsFileOutput);
+ currentIndexNode = new MetadataIndexNodeV2();
+ currentIndexNode.setLeaf(true);
}
currentIndexNode.addEntry(
- new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
+ new MetadataIndexEntry(
+ entry.getKey()
+ + TsFileConstant.PATH_SEPARATOR
+ + timeseriesMetadata.getMeasurementId(),
+ tsFileOutput.getPosition()));
serializedTimeseriesMetadataNum = 0;
+ isNewDevice = false;
}
- timeseriesMetadata.serializeTo(out.wrapAsStream());
+ timeseriesMetadata.serializeTo(tsFileOutput.wrapAsStream());
serializedTimeseriesMetadataNum++;
}
- addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
- deviceMetadataIndexMap.put(
- entry.getKey(),
- generateRootNode(
- measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
- }
-
- // if not exceed the max child nodes num, ignore the device index and directly point to the
- // measurement
- if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
- MetadataIndexNode metadataIndexNode =
- new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
- for (Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
- metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
- entry.getValue().serializeTo(out.wrapAsStream());
- }
- metadataIndexNode.setEndOffset(out.getPosition());
- return metadataIndexNode;
}
-
- // else, build level index for devices
- Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>();
- MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
-
- for (Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
- // when constructing from internal node, each node is related to an entry
- if (currentIndexNode.isFull()) {
- addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
- currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
- }
- currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
- entry.getValue().serializeTo(out.wrapAsStream());
- }
- addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
- MetadataIndexNode deviceMetadataIndexNode =
- generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
- deviceMetadataIndexNode.setEndOffset(out.getPosition());
- return deviceMetadataIndexNode;
+ addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexQueue, tsFileOutput); // ?
+ return generateRootNode(metadataIndexQueue, metadataIndexOutput);
}
/**
@@ -125,36 +99,34 @@ public class MetadataIndexConstructorV2 {
*
* @param metadataIndexNodeQueue queue of metadataIndexNode
* @param out tsfile output
- * @param type MetadataIndexNode type
*/
- private static MetadataIndexNode generateRootNode(
- Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
- throws IOException {
+ private static MetadataIndexNodeV2 generateRootNode(
+ Queue<MetadataIndexNodeV2> metadataIndexNodeQueue, TsFileOutput out) throws IOException {
int queueSize = metadataIndexNodeQueue.size();
- MetadataIndexNode metadataIndexNode;
- MetadataIndexNode currentIndexNode = new MetadataIndexNode(type);
+ MetadataIndexNodeV2 metadataIndexNode;
+ MetadataIndexNodeV2 currentIndexNode = new MetadataIndexNodeV2();
while (queueSize != 1) {
for (int i = 0; i < queueSize; i++) {
metadataIndexNode = metadataIndexNodeQueue.poll();
// when constructing from internal node, each node is related to an entry
if (currentIndexNode.isFull()) {
addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
- currentIndexNode = new MetadataIndexNode(type);
+ currentIndexNode = new MetadataIndexNodeV2();
}
currentIndexNode.addEntry(
new MetadataIndexEntry(metadataIndexNode.peek().getName(), out.getPosition()));
metadataIndexNode.serializeTo(out.wrapAsStream());
}
addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
- currentIndexNode = new MetadataIndexNode(type);
+ currentIndexNode = new MetadataIndexNodeV2();
queueSize = metadataIndexNodeQueue.size();
}
return metadataIndexNodeQueue.poll();
}
private static void addCurrentIndexNodeToQueue(
- MetadataIndexNode currentIndexNode,
- Queue<MetadataIndexNode> metadataIndexNodeQueue,
+ MetadataIndexNodeV2 currentIndexNode,
+ Queue<MetadataIndexNodeV2> metadataIndexNodeQueue,
TsFileOutput out)
throws IOException {
currentIndexNode.setEndOffset(out.getPosition());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f633..f9c1be9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -26,6 +26,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -37,6 +40,7 @@ public class MetadataIndexNode {
private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private final List<MetadataIndexEntry> children;
private long endOffset;
+ private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
/** type of the child node at offset */
private final MetadataIndexNodeType nodeType;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNodeV2.java
similarity index 82%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNodeV2.java
index 3f6f633..cad3ce7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNodeV2.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.tsfile.file.metadata;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -32,26 +31,23 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class MetadataIndexNode {
+public class MetadataIndexNodeV2 {
private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private final List<MetadataIndexEntry> children;
private long endOffset;
+ private boolean isLeaf;
- /** type of the child node at offset */
- private final MetadataIndexNodeType nodeType;
-
- public MetadataIndexNode(MetadataIndexNodeType nodeType) {
+ public MetadataIndexNodeV2() {
children = new ArrayList<>();
endOffset = -1L;
- this.nodeType = nodeType;
+ isLeaf = false;
}
- public MetadataIndexNode(
- List<MetadataIndexEntry> children, long endOffset, MetadataIndexNodeType nodeType) {
+ public MetadataIndexNodeV2(List<MetadataIndexEntry> children, long endOffset, boolean isLeaf) {
this.children = children;
this.endOffset = endOffset;
- this.nodeType = nodeType;
+ this.isLeaf = isLeaf;
}
public List<MetadataIndexEntry> getChildren() {
@@ -66,10 +62,6 @@ public class MetadataIndexNode {
this.endOffset = endOffset;
}
- public MetadataIndexNodeType getNodeType() {
- return nodeType;
- }
-
public void addEntry(MetadataIndexEntry metadataIndexEntry) {
this.children.add(metadataIndexEntry);
}
@@ -78,6 +70,14 @@ public class MetadataIndexNode {
return children.size() >= config.getMaxDegreeOfIndexNode();
}
+ public boolean isLeaf() {
+ return isLeaf;
+ }
+
+ public void setLeaf(boolean leaf) {
+ isLeaf = leaf;
+ }
+
MetadataIndexEntry peek() {
if (children.isEmpty()) {
return null;
@@ -92,20 +92,20 @@ public class MetadataIndexNode {
byteLen += metadataIndexEntry.serializeTo(outputStream);
}
byteLen += ReadWriteIOUtils.write(endOffset, outputStream);
- byteLen += ReadWriteIOUtils.write(nodeType.serialize(), outputStream);
+ byteLen += ReadWriteIOUtils.write(isLeaf, outputStream);
return byteLen;
}
- public static MetadataIndexNode deserializeFrom(ByteBuffer buffer) {
+ public static MetadataIndexNodeV2 deserializeFrom(ByteBuffer buffer) {
List<MetadataIndexEntry> children = new ArrayList<>();
int size = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
for (int i = 0; i < size; i++) {
children.add(MetadataIndexEntry.deserializeFrom(buffer));
}
long offset = ReadWriteIOUtils.readLong(buffer);
- MetadataIndexNodeType nodeType =
- MetadataIndexNodeType.deserialize(ReadWriteIOUtils.readByte(buffer));
- return new MetadataIndexNode(children, offset, nodeType);
+ boolean isLeaf = ReadWriteIOUtils.readBool(buffer);
+
+ return new MetadataIndexNodeV2(children, offset, isLeaf);
}
public Pair<MetadataIndexEntry, Long> getChildIndexEntry(String key, boolean exactSearch) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2..2fe8e30 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -38,6 +38,7 @@ public class TsFileMetadata {
// List of <name, offset, childMetadataIndexType>
private MetadataIndexNode metadataIndex;
+ private MetadataIndexNodeV2 metadataIndexV2;
// offset of MetaMarker.SEPARATOR
private long metaOffset;
@@ -69,6 +70,27 @@ public class TsFileMetadata {
return fileMetaData;
}
+ public static TsFileMetadata deserializeFromV2(ByteBuffer buffer) {
+ TsFileMetadata fileMetaData = new TsFileMetadata();
+
+ // metadataIndex
+ fileMetaData.metadataIndexV2 = MetadataIndexNodeV2.deserializeFrom(buffer);
+
+ // metaOffset
+ long metaOffset = ReadWriteIOUtils.readLong(buffer);
+ fileMetaData.setMetaOffset(metaOffset);
+
+ // read bloom filter
+ if (buffer.hasRemaining()) {
+ byte[] bytes = ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(buffer);
+ int filterSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
+ int hashFunctionSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
+ fileMetaData.bloomFilter = BloomFilter.buildBloomFilter(bytes, filterSize, hashFunctionSize);
+ }
+
+ return fileMetaData;
+ }
+
public BloomFilter getBloomFilter() {
return bloomFilter;
}
@@ -99,6 +121,22 @@ public class TsFileMetadata {
return byteLen;
}
+ public int serializeToV2(OutputStream outputStream) throws IOException {
+ int byteLen = 0;
+
+ // metadataIndex
+ if (metadataIndexV2 != null) {
+ byteLen += metadataIndexV2.serializeTo(outputStream);
+ } else {
+ byteLen += ReadWriteIOUtils.write(0, outputStream);
+ }
+
+ // metaOffset
+ byteLen += ReadWriteIOUtils.write(metaOffset, outputStream);
+
+ return byteLen;
+ }
+
/**
* use the given outputStream to serialize bloom filter.
*
@@ -146,7 +184,15 @@ public class TsFileMetadata {
return metadataIndex;
}
+ public MetadataIndexNodeV2 getMetadataIndexV2() {
+ return metadataIndexV2;
+ }
+
public void setMetadataIndex(MetadataIndexNode metadataIndex) {
this.metadataIndex = metadataIndex;
}
+
+ public void setMetadataIndex(MetadataIndexNodeV2 metadataIndex) {
+ this.metadataIndexV2 = metadataIndex;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java
index ec6d105..8d5fed1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java
@@ -43,10 +43,21 @@ public class ReadOnlyTsFile implements AutoCloseable {
tsFileExecutor = new TsFileExecutor(metadataQuerier, chunkLoader);
}
+ public ReadOnlyTsFile(TsFileSequenceReader fileReader, int treeType) throws IOException {
+ this.fileReader = fileReader;
+ this.metadataQuerier = new MetadataQuerierByFileImpl(fileReader, treeType);
+ this.chunkLoader = new CachedChunkLoaderImpl(fileReader);
+ tsFileExecutor = new TsFileExecutor(metadataQuerier, chunkLoader);
+ }
+
public QueryDataSet query(QueryExpression queryExpression) throws IOException {
return tsFileExecutor.execute(queryExpression);
}
+ public QueryDataSet query(QueryExpression queryExpression, int treeType) throws IOException {
+ return tsFileExecutor.execute(queryExpression, treeType);
+ }
+
public QueryDataSet query(
QueryExpression queryExpression, long partitionStartOffset, long partitionEndOffset)
throws IOException {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 712a812..9d3e4fa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
@@ -32,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNodeV2;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadataV2;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
@@ -133,14 +135,14 @@ public class TsFileSequenceReader implements AutoCloseable {
if (FSFactoryProducer.getFSFactory().getFile(file + ".index").exists()) {
metadataIndexInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file + ".index");
}
- // try {
- // if (loadMetadataSize) {
- // loadMetadataSize();
- // }
- // } catch (Throwable e) {
- // tsFileInput.close();
- // throw e;
- // }
+ try {
+ if (loadMetadataSize) {
+ loadMetadataSize();
+ }
+ } catch (Throwable e) {
+ tsFileInput.close();
+ throw e;
+ }
}
// used in merge resource
@@ -197,19 +199,19 @@ public class TsFileSequenceReader implements AutoCloseable {
public void loadMetadataSize() throws IOException {
ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
- // if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
- tsFileInput.read(
- metadataSize,
- tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
- metadataSize.flip();
- // read file metadata size and position
- fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
- fileMetadataPos =
- tsFileInput.size()
- - TSFileConfig.MAGIC_STRING.getBytes().length
- - Integer.BYTES
- - fileMetadataSize;
- // }
+ if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+ tsFileInput.read(
+ metadataSize,
+ tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+ metadataSize.flip();
+ // read file metadata size and position
+ fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+ fileMetadataPos =
+ tsFileInput.size()
+ - TSFileConfig.MAGIC_STRING.getBytes().length
+ - Integer.BYTES
+ - fileMetadataSize;
+ }
}
public long getFileMetadataPos() {
@@ -297,23 +299,39 @@ public class TsFileSequenceReader implements AutoCloseable {
public TsFileMetadata readFileMetadataV2() throws IOException {
try {
if (tsFileMetaData == null) {
- long start = System.nanoTime();
ByteBuffer rootNodeOffsetBuffer = ByteBuffer.allocate(Long.BYTES);
metadataIndexInput.read(rootNodeOffsetBuffer, metadataIndexInput.size() - Long.BYTES);
rootNodeOffsetBuffer.flip();
long rootNodeOffset = ReadWriteIOUtils.readLong(rootNodeOffsetBuffer);
- resourceLogger.debug(
- "ReadFileMetadataV2 " + (System.nanoTime() - start) / 1000_000L + " ms");
-
tsFileMetaData =
TsFileMetadataV2.deserializeFrom(
readData(
rootNodeOffset,
FSFactoryProducer.getFSFactory().getFile(this.file + ".index").length(),
metadataIndexInput));
- resourceLogger.debug(
- "ReadFileMetadataV2 " + (System.nanoTime() - start) / 1000_000L + " ms");
+ }
+ } catch (BufferOverflowException e) {
+ logger.error("Something error happened while reading file metadata of file {}", file);
+ throw e;
+ }
+ return tsFileMetaData;
+ }
+
+ public TsFileMetadata readFileMetadataV3() throws IOException {
+ try {
+ if (tsFileMetaData == null) {
+ ByteBuffer rootNodeOffsetBuffer = ByteBuffer.allocate(Long.BYTES);
+ metadataIndexInput.read(rootNodeOffsetBuffer, metadataIndexInput.size() - Long.BYTES);
+ rootNodeOffsetBuffer.flip();
+
+ long rootNodeOffset = ReadWriteIOUtils.readLong(rootNodeOffsetBuffer);
+ tsFileMetaData =
+ TsFileMetadata.deserializeFromV2(
+ readData(
+ rootNodeOffset,
+ FSFactoryProducer.getFSFactory().getFile(this.file + ".index").length(),
+ metadataIndexInput));
}
} catch (BufferOverflowException e) {
logger.error("Something error happened while reading file metadata of file {}", file);
@@ -512,6 +530,33 @@ public class TsFileSequenceReader implements AutoCloseable {
return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
}
+ public TimeseriesMetadata readTimeseriesMetadataV5(Path path, boolean ignoreNotExists)
+ throws IOException {
+ readFileMetadataV3();
+ MetadataIndexNodeV2 deviceMetadataIndexNode = tsFileMetaData.getMetadataIndexV2();
+ Pair<MetadataIndexEntry, Long> metadataIndexPair =
+ getMetadataAndEndOffsetV3(deviceMetadataIndexNode, path.getFullPath(), true);
+ if (metadataIndexPair == null) {
+ return null;
+ }
+
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ while (buffer.hasRemaining()) {
+ try {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
+ } catch (BufferOverflowException e) {
+ logger.error(
+ "Something error happened while deserializing TimeseriesMetadata of file {}", file);
+ throw e;
+ }
+ }
+ // return null if path does not exist in the TsFile
+ int searchResult =
+ binarySearchInTimeseriesMetadataList(timeseriesMetadataList, path.getMeasurement());
+ return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
+ }
+
/**
* Find the leaf node that contains this vector, return all the needed subSensor and time column
*
@@ -792,6 +837,31 @@ public class TsFileSequenceReader implements AutoCloseable {
return resultTimeseriesMetadataList;
}
+ public List<TimeseriesMetadata> readTimeseriesMetadataV4(Set<String> paths) throws IOException {
+ readFileMetadataV3();
+ MetadataIndexNodeV2 deviceMetadataIndexNode = tsFileMetaData.getMetadataIndexV2();
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+
+ for (String path : paths) {
+ Pair<MetadataIndexEntry, Long> metadataIndexPair =
+ getMetadataAndEndOffsetV3(deviceMetadataIndexNode, path, false);
+ if (metadataIndexPair == null) {
+ continue;
+ }
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ while (buffer.hasRemaining()) {
+ try {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
+ } catch (BufferOverflowException e) {
+ logger.error(
+ "Something error happened while deserializing TimeseriesMetadata of file {}", file);
+ throw e;
+ }
+ }
+ }
+ return timeseriesMetadataList;
+ }
+
protected int binarySearchInTimeseriesMetadataList(
List<TimeseriesMetadata> timeseriesMetadataList, String key) {
int low = 0;
@@ -1015,6 +1085,48 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
+ private void generateMetadataIndexV2(
+ MetadataIndexEntry metadataIndex,
+ ByteBuffer buffer,
+ boolean isLeaf,
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap,
+ boolean needChunkMetadata)
+ throws IOException {
+ try {
+ if (isLeaf) {
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata));
+ }
+ timeseriesMetadataMap
+ .computeIfAbsent(
+ metadataIndex.getName().split(TsFileConstant.PATH_SEPARATER_NO_REGEX)[0],
+ k -> new ArrayList<>())
+ .addAll(timeseriesMetadataList);
+ } else {
+ MetadataIndexNodeV2 metadataIndexNode = MetadataIndexNodeV2.deserializeFrom(buffer);
+ int metadataIndexListSize = metadataIndexNode.getChildren().size();
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ }
+ ByteBuffer nextBuffer =
+ readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
+ generateMetadataIndexV2(
+ metadataIndexNode.getChildren().get(i),
+ nextBuffer,
+ metadataIndexNode.isLeaf(),
+ timeseriesMetadataMap,
+ needChunkMetadata);
+ }
+ }
+ } catch (BufferOverflowException e) {
+ logger.error("Something error happened while generating MetadataIndex of file {}", file);
+ throw e;
+ }
+ }
+
/* TimeseriesMetadata don't need deserialize chunk metadata list */
public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
if (tsFileMetaData == null) {
@@ -1041,6 +1153,26 @@ public class TsFileSequenceReader implements AutoCloseable {
return timeseriesMetadataMap;
}
+ public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadataV2() throws IOException {
+ if (tsFileMetaData == null) {
+ readFileMetadataV3();
+ }
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>();
+ MetadataIndexNodeV2 metadataIndexNode = tsFileMetaData.getMetadataIndexV2();
+ List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
+ for (int i = 0; i < metadataIndexEntryList.size(); i++) {
+ MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexEntryList.size() - 1) {
+ endOffset = metadataIndexEntryList.get(i + 1).getOffset();
+ }
+ ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
+ generateMetadataIndexV2(
+ metadataIndexEntry, buffer, metadataIndexNode.isLeaf(), timeseriesMetadataMap, false);
+ }
+ return timeseriesMetadataMap;
+ }
+
/* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */
private List<TimeseriesMetadata> getDeviceTimeseriesMetadataWithoutChunkMetadata(String device)
throws IOException {
@@ -1149,6 +1281,25 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
+ protected Pair<MetadataIndexEntry, Long> getMetadataAndEndOffsetV3(
+ MetadataIndexNodeV2 metadataIndex, String path, boolean exactSearch) throws IOException {
+ try {
+ if (metadataIndex.isLeaf()) {
+ return metadataIndex.getChildIndexEntry(path, exactSearch);
+ } else {
+ Pair<MetadataIndexEntry, Long> childIndexEntry =
+ metadataIndex.getChildIndexEntry(path, false);
+ ByteBuffer buffer =
+ readData(childIndexEntry.left.getOffset(), childIndexEntry.right, metadataIndexInput);
+ return getMetadataAndEndOffsetV3(
+ MetadataIndexNodeV2.deserializeFrom(buffer), path, exactSearch);
+ }
+ } catch (BufferOverflowException e) {
+ logger.error("Something error happened while deserializing MetadataIndex of file {}", file);
+ throw e;
+ }
+ }
+
/**
* read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
* This method is not threadsafe.
@@ -1694,6 +1845,11 @@ public class TsFileSequenceReader implements AutoCloseable {
return MetadataIndexNode.deserializeFrom(readData(startOffset, endOffset));
}
+ public MetadataIndexNodeV2 getMetadataIndexNodeV2(long startOffset, long endOffset)
+ throws IOException {
+ return MetadataIndexNodeV2.deserializeFrom(readData(startOffset, endOffset));
+ }
+
/**
* Check if the device has at least one Chunk in this partition
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java
index 25efbad..5b4bc53 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java
@@ -49,6 +49,8 @@ public interface IMetadataQuerier {
void loadChunkMetaDatasV3(List<Path> paths) throws IOException;
+ void loadChunkMetaDatasV4(List<Path> paths) throws IOException;
+
/**
* @return the corresponding data type.
* @throws NoMeasurementException if the measurement not exists.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
index 6f96550..50d2357 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -67,6 +67,23 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier {
};
}
+ public MetadataQuerierByFileImpl(TsFileSequenceReader tsFileReader, int treeType)
+ throws IOException {
+ this.tsFileReader = tsFileReader;
+ if (treeType == 0) { // Zesong Tree
+ this.fileMetaData = tsFileReader.readFileMetadataV2();
+ } else if (treeType == 1) { // B+ Tree
+ this.fileMetaData = tsFileReader.readFileMetadataV3();
+ }
+ chunkMetaDataCache =
+ new LRUCache<Path, List<ChunkMetadata>>(CACHED_ENTRY_NUMBER) {
+ @Override
+ public List<ChunkMetadata> loadObjectByKey(Path key) throws IOException {
+ return loadChunkMetadata(key);
+ }
+ };
+ }
+
@Override
public List<IChunkMetadata> getChunkMetaDataList(Path path) throws IOException {
return new ArrayList<>(chunkMetaDataCache.get(path));
@@ -286,6 +303,61 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier {
}
}
+ public void loadChunkMetaDatasV4(List<Path> paths) throws IOException {
+ Set<String> pathSet = new HashSet<>();
+ TreeMap<String, Set<String>> deviceMeasurementsMap = new TreeMap<>();
+
+ for (Path path : paths) {
+ if (!deviceMeasurementsMap.containsKey(path.getDevice())) {
+ deviceMeasurementsMap.put(path.getDevice(), new HashSet<>());
+ }
+ deviceMeasurementsMap.get(path.getDevice()).add(path.getMeasurement());
+ pathSet.add(path.getFullPath());
+ }
+
+ Map<Path, List<ChunkMetadata>> tempChunkMetaDatas = new HashMap<>();
+
+ int count = 0;
+ boolean enough = false;
+
+ List<TimeseriesMetadata> timeseriesMetaDataList =
+ tsFileReader.readTimeseriesMetadataV4(pathSet);
+ List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetaDataList) {
+ chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(timeseriesMetadata));
+ }
+
+ // for cache
+ for (Map.Entry<String, Set<String>> deviceMeasurements : deviceMeasurementsMap.entrySet()) {
+ if (enough) {
+ break;
+ }
+ for (ChunkMetadata chunkMetaData : chunkMetadataList) {
+ String currentMeasurement = chunkMetaData.getMeasurementUid();
+ Path path = new Path(deviceMeasurements.getKey(), currentMeasurement);
+ if (!pathSet.contains(path.getFullPath())) {
+ continue;
+ }
+ // add into tempChunkMetaDatas
+ if (!tempChunkMetaDatas.containsKey(path)) {
+ tempChunkMetaDatas.put(path, new ArrayList<>());
+ }
+ tempChunkMetaDatas.get(path).add(chunkMetaData);
+
+ // check cache size, stop when reading enough
+ count++;
+ if (count == CACHED_ENTRY_NUMBER) {
+ enough = true;
+ break;
+ }
+ }
+ }
+
+ for (Map.Entry<Path, List<ChunkMetadata>> entry : tempChunkMetaDatas.entrySet()) {
+ chunkMetaDataCache.put(entry.getKey(), entry.getValue());
+ }
+ }
+
@Override
public TSDataType getDataType(Path path) throws IOException {
if (tsFileReader.getChunkMetadataList(path) == null
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java
index bfb8ea5..ddaa5d0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/ExecutorWithTimeGenerator.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithTimeGenerator;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.read.query.timegenerator.TsFileTimeGenerator;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderByTimestamp;
@@ -96,6 +97,11 @@ public class ExecutorWithTimeGenerator implements QueryExecutor {
selectedPathList, cached, dataTypes, timeGenerator, readersOfSelectedSeries);
}
+ @Override
+ public QueryDataSet execute(QueryExpression queryExpression, int treeType) throws IOException {
+ return null;
+ }
+
public static List<Boolean> markFilterdPaths(
IExpression expression, List<Path> selectedPaths, boolean hasOrNode) {
List<Boolean> cached = new ArrayList<>();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/QueryExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/QueryExecutor.java
index ea85712..f895a06 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/QueryExecutor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/QueryExecutor.java
@@ -26,4 +26,6 @@ import java.io.IOException;
public interface QueryExecutor {
QueryDataSet execute(QueryExpression queryExpression) throws IOException;
+
+ QueryDataSet execute(QueryExpression queryExpression, int treeType) throws IOException;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java
index cd547ca..7b195f9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java
@@ -67,8 +67,7 @@ public class TsFileExecutor implements QueryExecutor {
queryExpression.setSelectSeries(filteredSeriesPath);
}
- // metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
- metadataQuerier.loadChunkMetaDatasV3(queryExpression.getSelectedSeries());
+ metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
if (queryExpression.hasQueryFilter()) {
try {
@@ -97,6 +96,52 @@ public class TsFileExecutor implements QueryExecutor {
}
}
+ @Override
+ public QueryDataSet execute(QueryExpression queryExpression, int treeType) throws IOException {
+ // bloom filter
+ BloomFilter bloomFilter = metadataQuerier.getWholeFileMetadata().getBloomFilter();
+ List<Path> filteredSeriesPath = new ArrayList<>();
+ if (bloomFilter != null) {
+ for (Path path : queryExpression.getSelectedSeries()) {
+ if (bloomFilter.contains(path.getFullPath())) {
+ filteredSeriesPath.add(path);
+ }
+ }
+ queryExpression.setSelectSeries(filteredSeriesPath);
+ }
+
+ if (treeType == 0) {
+ metadataQuerier.loadChunkMetaDatasV3(queryExpression.getSelectedSeries());
+ } else if (treeType == 1) {
+ metadataQuerier.loadChunkMetaDatasV4(queryExpression.getSelectedSeries());
+ }
+ if (queryExpression.hasQueryFilter()) {
+ try {
+ IExpression expression = queryExpression.getExpression();
+ IExpression regularIExpression =
+ ExpressionOptimizer.getInstance()
+ .optimize(expression, queryExpression.getSelectedSeries());
+ queryExpression.setExpression(regularIExpression);
+
+ if (regularIExpression instanceof GlobalTimeExpression) {
+ return execute(
+ queryExpression.getSelectedSeries(), (GlobalTimeExpression) regularIExpression);
+ } else {
+ return new ExecutorWithTimeGenerator(metadataQuerier, chunkLoader)
+ .execute(queryExpression);
+ }
+ } catch (QueryFilterOptimizationException | NoMeasurementException e) {
+ throw new IOException(e);
+ }
+ } else {
+ try {
+ return execute(queryExpression.getSelectedSeries());
+ } catch (NoMeasurementException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
/**
* Query with the space partition constraint.
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 625c38c..4f7ccd7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -350,17 +350,23 @@ public class TsFileWriter implements AutoCloseable {
*/
@Override
public void close() throws IOException {
- // LOG.info("start close file");
- // flushAllChunkGroups();
- // fileWriter.endFile();
- // }
- //
- // public void closeV2() throws IOException {
- LOG.info("start close file IN NEW WAY");
+ LOG.info("start close file");
flushAllChunkGroups();
fileWriter.endFileV3();
}
+ public void closeBTree() throws IOException {
+ LOG.info("start close file");
+ flushAllChunkGroups();
+ fileWriter.endFileV2();
+ }
+
+ public void closeHash() throws IOException {
+ LOG.info("start close file");
+ flushAllChunkGroups();
+ fileWriter.endFileHash();
+ }
+
/**
* this function is only for Test.
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index ac24f39..4d6e70a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -26,11 +26,12 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexBucket;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructorV2;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNodeV2;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadataV2;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadataV2;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -91,7 +92,6 @@ public class TsFileIOWriter {
// for upgrade tool
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap;
- Map<String, List<TimeseriesMetadataV2>> deviceTimeseriesMetadataV2Map;
// the two longs marks the index range of operations in current MemTable
// and are serialized after MetaMarker.OPERATION_INDEX_RANGE to recover file-level range
@@ -331,93 +331,94 @@ public class TsFileIOWriter {
}
canWrite = false;
}
- //
- // public void endFileV2() throws IOException {
- // long metaOffset = out.getPosition();
- //
- // // serialize the SEPARATOR of MetaData
- // ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
- //
- // // group ChunkMetadata by series
- // // only contains ordinary path and time column of vector series
- // Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
- //
- // // time column -> ChunkMetadataList TreeMap of value columns in vector
- // Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap = new HashMap<>();
- //
- // for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
- // List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
- // int idx = 0;
- // while (idx < chunkMetadatas.size()) {
- // IChunkMetadata chunkMetadata = chunkMetadatas.get(idx);
- // if (chunkMetadata.getMask() == 0) {
- // Path series = new Path(chunkGroupMetadata.getDevice(),
- // chunkMetadata.getMeasurementUid());
- // chunkMetadataListMap.computeIfAbsent(series, k -> new
- // ArrayList<>()).add(chunkMetadata);
- // idx++;
- // } else if (chunkMetadata.isTimeColumn()) {
- // // time column of a vector series
- // Path series = new Path(chunkGroupMetadata.getDevice(),
- // chunkMetadata.getMeasurementUid());
- // chunkMetadataListMap.computeIfAbsent(series, k -> new
- // ArrayList<>()).add(chunkMetadata);
- // idx++;
- // Map<Path, List<IChunkMetadata>> chunkMetadataListMapInVector =
- // vectorToPathsMap.computeIfAbsent(series, key -> new TreeMap<>());
- //
- // // value columns of a vector series
- // while (idx < chunkMetadatas.size() && chunkMetadatas.get(idx).isValueColumn()) {
- // chunkMetadata = chunkMetadatas.get(idx);
- // Path vectorSeries =
- // new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
- // chunkMetadataListMapInVector
- // .computeIfAbsent(vectorSeries, k -> new ArrayList<>())
- // .add(chunkMetadata);
- // idx++;
- // }
- // }
- // }
- // }
- //
- // MetadataIndexNode metadataIndex = flushMetadataIndexV2(chunkMetadataListMap,
- // vectorToPathsMap);
- // TsFileMetadata tsFileMetaData = new TsFileMetadata();
- // tsFileMetaData.setMetadataIndex(metadataIndex);
- // tsFileMetaData.setMetaOffset(metaOffset);
- //
- // long footerIndex = out.getPosition();
- // if (logger.isDebugEnabled()) {
- // logger.debug("start to flush the footer,file pos:{}", footerIndex);
- // }
- //
- // // write TsFileMetaData
- // int size = tsFileMetaData.serializeTo(out.wrapAsStream());
- // if (logger.isDebugEnabled()) {
- // logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData,
- // out.getPosition());
- // }
- //
- // // write bloom filter
- // size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(),
- // chunkMetadataListMap.keySet());
- // if (logger.isDebugEnabled()) {
- // logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
- // }
- //
- // // write TsFileMetaData size
- // ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
- //
- // // write magic string
- // out.write(MAGIC_STRING_BYTES);
- //
- // // close file
- // out.close();
- // if (resourceLogger.isDebugEnabled() && file != null) {
- // resourceLogger.debug("{} writer is closed.", file.getName());
- // }
- // canWrite = false;
- // }
+
+ public void endFileV2() throws IOException {
+ long metaOffset = out.getPosition();
+
+ // serialize the SEPARATOR of MetaData
+ ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
+ // group ChunkMetadata by series
+ // only contains ordinary path and time column of vector series
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+ // time column -> ChunkMetadataList TreeMap of value columns in vector
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap = new HashMap<>();
+
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
+ int idx = 0;
+ while (idx < chunkMetadatas.size()) {
+ IChunkMetadata chunkMetadata = chunkMetadatas.get(idx);
+ if (chunkMetadata.getMask() == 0) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ idx++;
+ } else if (chunkMetadata.isTimeColumn()) {
+ // time column of a vector series
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ idx++;
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMapInVector =
+ vectorToPathsMap.computeIfAbsent(series, key -> new TreeMap<>());
+
+ // value columns of a vector series
+ while (idx < chunkMetadatas.size() && chunkMetadatas.get(idx).isValueColumn()) {
+ chunkMetadata = chunkMetadatas.get(idx);
+ Path vectorSeries =
+ new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMapInVector
+ .computeIfAbsent(vectorSeries, k -> new ArrayList<>())
+ .add(chunkMetadata);
+ idx++;
+ }
+ }
+ }
+ }
+
+ TsFileMetadata tsFileMetaData = new TsFileMetadata();
+ TsFileOutput metadataIndexOutput =
+ new LocalTsFileOutput(new FileOutputStream(new File(file.getAbsolutePath() + ".index")));
+ MetadataIndexNodeV2 metadataIndex =
+ flushMetadataIndexV2(chunkMetadataListMap, vectorToPathsMap, metadataIndexOutput);
+ tsFileMetaData.setMetadataIndex(metadataIndex);
+
+ long rootNodeOffset = metadataIndexOutput.getPosition();
+ // write TsFileMetaData
+ int size = tsFileMetaData.serializeToV2(metadataIndexOutput.wrapAsStream());
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "finish flushing the footer {}, file pos:{}",
+ tsFileMetaData,
+ metadataIndexOutput.getPosition());
+ }
+
+ // write bloom filter
+ size +=
+ tsFileMetaData.serializeBloomFilter(
+ metadataIndexOutput.wrapAsStream(), chunkMetadataListMap.keySet());
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "finish flushing the bloom filter file pos:{}", metadataIndexOutput.getPosition());
+ }
+
+ // write TsFileMetaData size
+ ReadWriteIOUtils.write(
+ size, metadataIndexOutput.wrapAsStream()); // write the size of the file metadata.
+ ReadWriteIOUtils.write(rootNodeOffset, metadataIndexOutput.wrapAsStream());
+
+ metadataIndexOutput.close();
+ ReadWriteIOUtils.write(metaOffset, out.wrapAsStream());
+ // write magic string
+ out.write(MAGIC_STRING_BYTES);
+
+ // close file
+ out.close();
+ if (resourceLogger.isDebugEnabled() && file != null) {
+ resourceLogger.debug("{} writer is closed IN NEW WAY.", file.getName());
+ }
+ canWrite = false;
+ }
public void endFileV3() throws IOException {
long metaOffset = out.getPosition();
@@ -509,6 +510,94 @@ public class TsFileIOWriter {
canWrite = false;
}
+ public void endFileHash() throws IOException {
+ long metaOffset = out.getPosition();
+
+ // serialize the SEPARATOR of MetaData
+ ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
+ // group ChunkMetadata by series
+ // only contains ordinary path and time column of vector series
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+ // time column -> ChunkMetadataList TreeMap of value columns in vector
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap = new HashMap<>();
+
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
+ int idx = 0;
+ while (idx < chunkMetadatas.size()) {
+ IChunkMetadata chunkMetadata = chunkMetadatas.get(idx);
+ if (chunkMetadata.getMask() == 0) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ idx++;
+ } else if (chunkMetadata.isTimeColumn()) {
+ // time column of a vector series
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ idx++;
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMapInVector =
+ vectorToPathsMap.computeIfAbsent(series, key -> new TreeMap<>());
+
+ // value columns of a vector series
+ while (idx < chunkMetadatas.size() && chunkMetadatas.get(idx).isValueColumn()) {
+ chunkMetadata = chunkMetadatas.get(idx);
+ Path vectorSeries =
+ new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMapInVector
+ .computeIfAbsent(vectorSeries, k -> new ArrayList<>())
+ .add(chunkMetadata);
+ idx++;
+ }
+ }
+ }
+ }
+
+ TsFileMetadata tsFileMetaData = new TsFileMetadata();
+ TsFileOutput metadataIndexOutput =
+ new LocalTsFileOutput(new FileOutputStream(new File(file.getAbsolutePath() + ".index")));
+ MetadataIndexNodeV2 metadataIndex =
+ flushMetadataIndexV2(chunkMetadataListMap, vectorToPathsMap, metadataIndexOutput);
+ tsFileMetaData.setMetadataIndex(metadataIndex);
+
+ long rootNodeOffset = metadataIndexOutput.getPosition();
+ // write TsFileMetaData
+ int size = tsFileMetaData.serializeToV2(metadataIndexOutput.wrapAsStream());
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "finish flushing the footer {}, file pos:{}",
+ tsFileMetaData,
+ metadataIndexOutput.getPosition());
+ }
+
+ // write bloom filter
+ size +=
+ tsFileMetaData.serializeBloomFilter(
+ metadataIndexOutput.wrapAsStream(), chunkMetadataListMap.keySet());
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "finish flushing the bloom filter file pos:{}", metadataIndexOutput.getPosition());
+ }
+
+ // write TsFileMetaData size
+ ReadWriteIOUtils.write(
+ size, metadataIndexOutput.wrapAsStream()); // write the size of the file metadata.
+ ReadWriteIOUtils.write(rootNodeOffset, metadataIndexOutput.wrapAsStream());
+
+ metadataIndexOutput.close();
+ ReadWriteIOUtils.write(metaOffset, out.wrapAsStream());
+ // write magic string
+ out.write(MAGIC_STRING_BYTES);
+
+ // close file
+ out.close();
+ if (resourceLogger.isDebugEnabled() && file != null) {
+ resourceLogger.debug("{} writer is closed IN NEW WAY.", file.getName());
+ }
+ canWrite = false;
+ }
+
/**
* Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
*
@@ -536,21 +625,41 @@ public class TsFileIOWriter {
deviceTimeseriesMetadataMap, out, metadataIndexOutput);
}
- private MetadataIndexNode flushMetadataIndexV2(
+ private MetadataIndexNodeV2 flushMetadataIndexV2(
Map<Path, List<IChunkMetadata>> chunkMetadataListMap,
- Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap,
+ TsFileOutput metadataIndexOutput)
throws IOException {
// convert ChunkMetadataList to this field
- deviceTimeseriesMetadataV2Map = new LinkedHashMap<>();
+ deviceTimeseriesMetadataMap = new LinkedHashMap<>();
// create device -> TimeseriesMetaDataList Map
for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
// for ordinary path
- flushOneChunkMetadataV2(entry.getKey(), entry.getValue(), vectorToPathsMap);
+ flushOneChunkMetadata(entry.getKey(), entry.getValue(), vectorToPathsMap);
}
// construct TsFileMetadata and return
- return MetadataIndexConstructorV2.constructMetadataIndex(deviceTimeseriesMetadataV2Map, out);
+ return MetadataIndexConstructorV2.constructMetadataIndex(
+ deviceTimeseriesMetadataMap, out, metadataIndexOutput);
+ }
+
+ private MetadataIndexBucket[] flushMetadataIndexHash(
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap,
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap,
+ TsFileOutput metadataIndexOutput)
+ throws IOException {
+
+ // convert ChunkMetadataList to this field
+ deviceTimeseriesMetadataMap = new LinkedHashMap<>();
+ // create device -> TimeseriesMetaDataList Map
+ for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+ // for ordinary path
+ flushOneChunkMetadata(entry.getKey(), entry.getValue(), vectorToPathsMap);
+ }
+
+ // construct TsFileMetadata and return
+ return new MetadataIndexBucket[0];
}
/**
@@ -607,50 +716,6 @@ public class TsFileIOWriter {
}
}
- private void flushOneChunkMetadataV2(
- Path path,
- List<IChunkMetadata> chunkMetadataList,
- Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
- throws IOException {
- // create TimeseriesMetaData
- PublicBAOS publicBAOS = new PublicBAOS();
- TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
-
- int chunkMetadataListLength = 0;
- boolean serializeStatistic = (chunkMetadataList.size() > 1);
- // flush chunkMetadataList one by one
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- if (!chunkMetadata.getDataType().equals(dataType)) {
- continue;
- }
- chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
- }
-
- TimeseriesMetadataV2 timeseriesMetadata =
- new TimeseriesMetadataV2(
- (byte)
- ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
- chunkMetadataListLength,
- path.getMeasurement(),
- dataType,
- publicBAOS);
- deviceTimeseriesMetadataV2Map
- .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
- .add(timeseriesMetadata);
-
- // no VECTOR
- // for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- // // chunkMetadata is time column of a vector series
- // if (chunkMetadata.isTimeColumn()) {
- // Map<Path, List<IChunkMetadata>> vectorMap = vectorToPathsMap.get(path);
- //
- // for (Map.Entry<Path, List<IChunkMetadata>> entry : vectorMap.entrySet()) {
- // flushOneChunkMetadataV2(entry.getKey(), entry.getValue(), vectorToPathsMap);
- // }
- // }
- // }
- }
-
/**
* get the length of normal OutputStream.
*
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorV2Test.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorV2Test.java
new file mode 100644
index 0000000..685cd32
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorV2Test.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.constant.TestConstant;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNodeV2;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** test for MetadataIndexConstructor */
+public class MetadataIndexConstructorV2Test {
+ private static final Logger logger =
+ LoggerFactory.getLogger(MetadataIndexConstructorV2Test.class);
+ private final TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
+ private static final String FILE_PATH =
+ TestConstant.BASE_OUTPUT_PATH.concat("MetadataIndexConstructorTest.tsfile");
+
+ private static final String measurementPrefix = "sensor_";
+ private static final String vectorPrefix = "vector_";
+ private int maxDegreeOfIndexNode;
+
+ @Before
+ public void before() {
+ maxDegreeOfIndexNode = conf.getMaxDegreeOfIndexNode();
+ conf.setMaxDegreeOfIndexNode(10);
+ }
+
+ @After
+ public void after() {
+ conf.setMaxDegreeOfIndexNode(maxDegreeOfIndexNode);
+ }
+
+ /** Example 1: 5 entities with 5 measurements each */
+ @Test
+ public void singleIndexTest1() {
+ int deviceNum = 5;
+ int measurementNum = 5;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + i;
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /** Example 2: 1 entity with 150 measurements */
+ @Test
+ public void singleIndexTest2() {
+ int deviceNum = 1;
+ int measurementNum = 150;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + i;
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /** Example 3: 150 entities with 1 measurement each */
+ @Test
+ public void singleIndexTest3() {
+ int deviceNum = 150;
+ int measurementNum = 1;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + generateIndexString(i, deviceNum);
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /** Example 4: 150 entities with 150 measurements each */
+ @Test
+ public void singleIndexTest4() {
+ int deviceNum = 150;
+ int measurementNum = 150;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + generateIndexString(i, deviceNum);
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /**
+ * start test
+ *
+ * @param devices name and number of device
+ * @param vectorMeasurement the number of device and the number of values to include in the tablet
+ * @param singleMeasurement non-vector measurement name, set null if no need
+ */
+ private void test(String[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
+ // 1. generate file
+ generateFile(devices, vectorMeasurement, singleMeasurement);
+ // 2. read metadata from file
+ List<String> actualPaths = new ArrayList<>(); // contains all device by sequence
+ readMetaDataDFS(actualPaths);
+
+ List<String> actualDevices = new ArrayList<>(); // contains all device by sequence
+ List<List<String>> actualMeasurements = new ArrayList<>(); // contains all device by sequence
+
+ String lastDevice = null;
+ for (String path : actualPaths) {
+ String device = path.split(TsFileConstant.PATH_SEPARATER_NO_REGEX)[0];
+ String measurement = path.split(TsFileConstant.PATH_SEPARATER_NO_REGEX)[1];
+ if (!device.equals(lastDevice)) {
+ actualDevices.add(device);
+ List<String> measurements = new ArrayList<>();
+ measurements.add(measurement);
+ actualMeasurements.add(measurements);
+ } else {
+ actualMeasurements.get(actualMeasurements.size() - 1).add(measurement);
+ }
+ lastDevice = device;
+ }
+
+ // 3. generate correct result
+ List<String> correctDevices = new ArrayList<>(); // contains all device by sequence
+ List<List<String>> correctFirstMeasurements =
+ new ArrayList<>(); // contains first measurements of every leaf, group by device
+ generateCorrectResult(
+ correctDevices, correctFirstMeasurements, devices, vectorMeasurement, singleMeasurement);
+ // 4. compare correct result with TsFile's metadata
+ Arrays.sort(devices);
+ // 4.1 make sure device in order
+ assertEquals(correctDevices.size(), devices.length);
+ assertEquals(actualDevices.size(), correctDevices.size());
+ for (int i = 0; i < actualDevices.size(); i++) {
+ assertEquals(actualDevices.get(i), correctDevices.get(i));
+ }
+ // 4.2 make sure timeseries in order
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+ reader.getAllTimeseriesMetadataV2();
+ for (int j = 0; j < actualDevices.size(); j++) {
+ for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
+ assertEquals(
+ allTimeseriesMetadata.get(actualDevices.get(j)).get(i).getMeasurementId(),
+ correctFirstMeasurements.get(j).get(i));
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ // 4.3 make sure split leaf correctly
+ for (int j = 0; j < actualDevices.size(); j++) {
+ for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
+ assertEquals(
+ actualMeasurements.get(j).get(i),
+ correctFirstMeasurements.get(j).get(i * conf.getMaxDegreeOfIndexNode()));
+ }
+ }
+ }
+
+ /**
+ * read TsFile metadata, load actual message in devices and measurements
+ *
+ * @param paths load actual paths
+ */
+ private void readMetaDataDFS(List<String> paths) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ TsFileMetadata tsFileMetaData = reader.readFileMetadataV3();
+ MetadataIndexNodeV2 metadataIndexNode = tsFileMetaData.getMetadataIndexV2();
+ deviceDFS(paths, reader, metadataIndexNode);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /** DFS in device level load actual devices */
+ private void deviceDFS(
+ List<String> paths, TsFileSequenceReader reader, MetadataIndexNodeV2 node) {
+ try {
+ for (int i = 0; i < node.getChildren().size(); i++) {
+ MetadataIndexEntry metadataIndexEntry = node.getChildren().get(i);
+ long endOffset = node.getEndOffset();
+ if (i != node.getChildren().size() - 1) {
+ endOffset = node.getChildren().get(i + 1).getOffset();
+ }
+ MetadataIndexNodeV2 subNode =
+ reader.getMetadataIndexNodeV2(metadataIndexEntry.getOffset(), endOffset);
+ if (node.isLeaf()) {
+ paths.add(metadataIndexEntry.getName());
+ } else {
+ deviceDFS(paths, reader, subNode);
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * generate correct devices and measurements for test Note that if the metadata index tree is
+ * re-designed, you may need to modify this function as well.
+ *
+ * @param correctDevices output
+ * @param correctMeasurements output
+ * @param devices input
+ * @param vectorMeasurement input
+ * @param singleMeasurement input
+ */
+ private void generateCorrectResult(
+ List<String> correctDevices,
+ List<List<String>> correctMeasurements,
+ String[] devices,
+ int[][] vectorMeasurement,
+ String[][] singleMeasurement) {
+ for (int i = 0; i < devices.length; i++) {
+ String device = devices[i];
+ correctDevices.add(device);
+ // generate measurement and sort
+ List<String> measurements = new ArrayList<>();
+ // single-variable measurement
+ if (singleMeasurement != null) {
+ measurements.addAll(Arrays.asList(singleMeasurement[i]));
+ }
+ // multi-variable measurement
+ for (int vectorIndex = 0; vectorIndex < vectorMeasurement[i].length; vectorIndex++) {
+ String vectorName =
+ vectorPrefix + generateIndexString(vectorIndex, vectorMeasurement.length);
+ measurements.add(vectorName);
+ int measurementNum = vectorMeasurement[i][vectorIndex];
+ for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
+ String measurementName =
+ measurementPrefix + generateIndexString(measurementIndex, measurementNum);
+ measurements.add(vectorName + TsFileConstant.PATH_SEPARATOR + measurementName);
+ }
+ }
+ Collections.sort(measurements);
+ correctMeasurements.add(measurements);
+ }
+ Collections.sort(correctDevices);
+ }
+
+ /**
+ * @param devices name and number of device
+ * @param vectorMeasurement the number of device and the number of values to include in the tablet
+ * @param singleMeasurement non-vector measurement name, set null if no need
+ */
+ private void generateFile(
+ String[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
+ File f = FSFactoryProducer.getFSFactory().getFile(FILE_PATH);
+ if (f.exists() && !f.delete()) {
+ fail("can not delete " + f.getAbsolutePath());
+ }
+ Schema schema = new Schema();
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {
+ // write single-variable timeseries
+ if (singleMeasurement != null) {
+ for (int i = 0; i < singleMeasurement.length; i++) {
+ String device = devices[i];
+ for (String measurement : singleMeasurement[i]) {
+ tsFileWriter.registerTimeseries(
+ new Path(device, measurement),
+ new UnaryMeasurementSchema(measurement, TSDataType.INT64, TSEncoding.RLE));
+ }
+ // the number of record rows
+ int rowNum = 10;
+ for (int row = 0; row < rowNum; row++) {
+ TSRecord tsRecord = new TSRecord(row, device);
+ for (String measurement : singleMeasurement[i]) {
+ DataPoint dPoint = new LongDataPoint(measurement, row);
+ tsRecord.addTuple(dPoint);
+ }
+ if (tsRecord.dataPointList.size() > 0) {
+ tsFileWriter.write(tsRecord);
+ }
+ }
+ }
+ }
+
+ // write multi-variable timeseries
+ for (int i = 0; i < devices.length; i++) {
+ String device = devices[i];
+ logger.info("generating device {}...", device);
+ // the number of rows to include in the tablet
+ int rowNum = 10;
+ for (int vectorIndex = 0; vectorIndex < vectorMeasurement[i].length; vectorIndex++) {
+ String vectorName =
+ vectorPrefix + generateIndexString(vectorIndex, vectorMeasurement.length);
+ logger.info("generating vector {}...", vectorName);
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ int measurementNum = vectorMeasurement[i][vectorIndex];
+ String[] measurementNames = new String[measurementNum];
+ TSDataType[] dataTypes = new TSDataType[measurementNum];
+ for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
+ String measurementName =
+ measurementPrefix + generateIndexString(measurementIndex, measurementNum);
+ logger.info("generating vector measurement {}...", measurementName);
+ // add measurements into file schema (all with INT64 data type)
+ measurementNames[measurementIndex] = measurementName;
+ dataTypes[measurementIndex] = TSDataType.INT64;
+ }
+ IMeasurementSchema measurementSchema =
+ new VectorMeasurementSchema(vectorName, measurementNames, dataTypes);
+ measurementSchemas.add(measurementSchema);
+ schema.registerTimeseries(new Path(device, vectorName), measurementSchema);
+ // add measurements into TSFileWriter
+ // construct the tablet
+ Tablet tablet = new Tablet(device, measurementSchemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ long timestamp = 1;
+ long value = 1000000L;
+ for (int r = 0; r < rowNum; r++, value++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = timestamp++;
+ for (int j = 0; j < measurementNum; j++) {
+ long[] sensor = (long[]) values[j];
+ sensor[row] = value;
+ }
+ // write Tablet to TsFile
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+ // write Tablet to TsFile
+ if (tablet.rowSize != 0) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("meet error in TsFileWrite with tablet", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * generate curIndex string, use "0" on left to make sure align
+ *
+ * @param curIndex current index
+ * @param maxIndex max index
+ * @return curIndex's string
+ */
+ private String generateIndexString(int curIndex, int maxIndex) {
+ StringBuilder res = new StringBuilder(String.valueOf(curIndex));
+ String target = String.valueOf(maxIndex);
+ while (res.length() < target.length()) {
+ res.insert(0, "0");
+ }
+ return res.toString();
+ }
+}