You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/12/28 01:59:51 UTC
[iotdb] 01/01: Init commit
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch tsfile_v4
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 613c583f62bae4e3800aef3bcd11d63516e54658
Author: Zesong Sun <v-...@microsoft.com>
AuthorDate: Tue Dec 28 09:44:21 2021 +0800
Init commit
---
.../apache/iotdb/db/tools/TsFileSketchTool.java | 1 +
.../iotdb/tsfile/common/conf/TSFileConfig.java | 11 +
.../iotdb/tsfile/file/metadata/TsFileMetadata.java | 28 +-
.../metadataIndex/BPlusTreeConstructor.java | 134 +++++++++
.../BPlusTreeNode.java} | 42 ++-
.../MetadataIndexConstructor.java | 43 +--
.../{ => metadataIndex}/MetadataIndexEntry.java | 2 +-
.../{ => metadataIndex}/MetadataIndexNode.java | 10 +-
.../metadata/metadataIndex/MetadataIndexType.java} | 18 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 140 +++++++++-
.../v2/file/metadata/MetadataIndexEntryV2.java | 2 +-
.../v2/file/metadata/MetadataIndexNodeV2.java | 4 +-
.../tsfile/v2/read/TsFileSequenceReaderForV2.java | 17 +-
.../write/writer/ForceAppendTsFileWriter.java | 5 +-
.../write/writer/RestorableTsFileIOWriter.java | 18 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 138 ++++++---
.../{ => metadataIndex}/MetadataIndexNodeTest.java | 2 +-
.../tsfile/file/metadata/utils/TestHelper.java | 4 +-
.../iotdb/tsfile/file/metadata/utils/Utils.java | 2 +-
.../apache/iotdb/tsfile/utils/FileGenerator.java | 130 ++++++++-
.../write/BPlusTreeIndexConstructorTest.java | 311 +++++++++++++++++++++
.../tsfile/write/MetadataIndexConstructorTest.java | 157 ++---------
.../write/writer/AlignedChunkWriterImplTest.java | 4 +-
.../write/writer/RestorableTsFileIOWriterTest.java | 4 +-
.../tsfile/write/writer/TimeChunkWriterTest.java | 4 +-
.../tsfile/write/writer/ValueChunkWriterTest.java | 7 +-
26 files changed, 945 insertions(+), 293 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index 947c1b7..e48a6a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.*;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index d13f817..bbadcc0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.conf;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexType;
import org.apache.iotdb.tsfile.fileSystem.FSType;
import java.io.Serializable;
@@ -77,6 +78,8 @@ public class TSFileConfig implements Serializable {
private int maxNumberOfPointsInPage = 1024 * 1024;
/** The maximum degree of a metadataIndex node, default value is 256 */
private int maxDegreeOfIndexNode = 256;
+ /** The type of MetadataIndex */
+ private MetadataIndexType metadataIndexType = MetadataIndexType.ORIGIN;
/** Data type for input timestamp, TsFile supports INT64. */
private TSDataType timeSeriesDataType = TSDataType.INT64;
/** Max length limitation of input string. */
@@ -180,6 +183,14 @@ public class TSFileConfig implements Serializable {
this.maxDegreeOfIndexNode = maxDegreeOfIndexNode;
}
+ public MetadataIndexType getMetadataIndexType() {
+ return metadataIndexType;
+ }
+
+ public void setMetadataIndexType(MetadataIndexType metadataIndexType) {
+ this.metadataIndexType = metadataIndexType;
+ }
+
public TSDataType getTimeSeriesDataType() {
return timeSeriesDataType;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2..47f5868 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -19,7 +19,11 @@
package org.apache.iotdb.tsfile.file.metadata;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.BPlusTreeNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
@@ -33,10 +37,12 @@ import java.util.Set;
/** TSFileMetaData collects all metadata info and saves in its data structure. */
public class TsFileMetadata {
+ private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
// bloom filter
private BloomFilter bloomFilter;
- // List of <name, offset, childMetadataIndexType>
+ // index of TimeseriesMetadataIndex
private MetadataIndexNode metadataIndex;
// offset of MetaMarker.SEPARATOR
@@ -52,11 +58,15 @@ public class TsFileMetadata {
TsFileMetadata fileMetaData = new TsFileMetadata();
// metadataIndex
- fileMetaData.metadataIndex = MetadataIndexNode.deserializeFrom(buffer);
-
- // metaOffset
- long metaOffset = ReadWriteIOUtils.readLong(buffer);
- fileMetaData.setMetaOffset(metaOffset);
+ if (config.getMetadataIndexType().equals(MetadataIndexType.ORIGIN)) {
+ fileMetaData.metadataIndex = MetadataIndexNode.deserializeFrom(buffer);
+
+ // metaOffset
+ long metaOffset = ReadWriteIOUtils.readLong(buffer);
+ fileMetaData.setMetaOffset(metaOffset);
+ } else if (config.getMetadataIndexType().equals(MetadataIndexType.B_PLUS_TREE)) {
+ fileMetaData.metadataIndex = BPlusTreeNode.deserializeFrom(buffer);
+ }
// read bloom filter
if (buffer.hasRemaining()) {
@@ -89,13 +99,11 @@ public class TsFileMetadata {
// metadataIndex
if (metadataIndex != null) {
byteLen += metadataIndex.serializeTo(outputStream);
+ // metaOffset
+ byteLen += ReadWriteIOUtils.write(metaOffset, outputStream);
} else {
byteLen += ReadWriteIOUtils.write(0, outputStream);
}
-
- // metaOffset
- byteLen += ReadWriteIOUtils.write(metaOffset, outputStream);
-
return byteLen;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/BPlusTreeConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/BPlusTreeConstructor.java
new file mode 100644
index 0000000..f468a20
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/BPlusTreeConstructor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.file.metadata.metadataIndex;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+public class BPlusTreeConstructor {
+
+ private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
+ private BPlusTreeConstructor() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ /**
+ * Construct metadata index tree
+ *
+ * @param deviceTimeseriesMetadataMap device => TimeseriesMetadata list
+ * @param tsFileOutput tsfile output
+ * @param metadataIndexOutput metadataIndex output
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ public static BPlusTreeNode constructMetadataIndex(
+ Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap,
+ TsFileOutput tsFileOutput,
+ TsFileOutput metadataIndexOutput)
+ throws IOException {
+
+ Queue<BPlusTreeNode> metadataIndexQueue = new ArrayDeque<>();
+ BPlusTreeNode currentIndexNode = new BPlusTreeNode();
+ currentIndexNode.setLeaf(true);
+ int serializedTimeseriesMetadataNum = 0;
+ boolean isNewDevice;
+ // for timeseriesMetadata of each device
+ for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ continue;
+ }
+ isNewDevice = true;
+ TimeseriesMetadata timeseriesMetadata;
+ for (int i = 0; i < entry.getValue().size(); i++) {
+ timeseriesMetadata = entry.getValue().get(i);
+ if (serializedTimeseriesMetadataNum == 0
+ || serializedTimeseriesMetadataNum >= config.getMaxDegreeOfIndexNode()
+ || isNewDevice) {
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexQueue, tsFileOutput);
+ currentIndexNode = new BPlusTreeNode();
+ currentIndexNode.setLeaf(true);
+ }
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(
+ entry.getKey()
+ + TsFileConstant.PATH_SEPARATOR
+ + timeseriesMetadata.getMeasurementId(),
+ tsFileOutput.getPosition()));
+ serializedTimeseriesMetadataNum = 0;
+ isNewDevice = false;
+ }
+ timeseriesMetadata.serializeTo(tsFileOutput.wrapAsStream());
+ serializedTimeseriesMetadataNum++;
+ }
+ }
+ addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexQueue, tsFileOutput); // ?
+ return generateRootNode(metadataIndexQueue, metadataIndexOutput);
+ }
+
+ /**
+ * Generate root node, using the nodes in the queue as leaf nodes. The final metadata tree has two
+ * levels: measurement leaf nodes will generate to measurement root node; device leaf nodes will
+ * generate to device root node
+ *
+ * @param metadataIndexNodeQueue queue of metadataIndexNode
+ * @param out tsfile output
+ */
+ private static BPlusTreeNode generateRootNode(
+ Queue<BPlusTreeNode> metadataIndexNodeQueue, TsFileOutput out) throws IOException {
+ int queueSize = metadataIndexNodeQueue.size();
+ BPlusTreeNode metadataIndexNode;
+ BPlusTreeNode currentIndexNode = new BPlusTreeNode();
+ while (queueSize != 1) {
+ for (int i = 0; i < queueSize; i++) {
+ metadataIndexNode = metadataIndexNodeQueue.poll();
+ // when constructing from internal node, each node is related to an entry
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
+ currentIndexNode = new BPlusTreeNode();
+ }
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(metadataIndexNode.peek().getName(), out.getPosition()));
+ metadataIndexNode.serializeTo(out.wrapAsStream());
+ }
+ addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
+ currentIndexNode = new BPlusTreeNode();
+ queueSize = metadataIndexNodeQueue.size();
+ }
+ return metadataIndexNodeQueue.poll();
+ }
+
+ private static void addCurrentIndexNodeToQueue(
+ BPlusTreeNode currentIndexNode, Queue<BPlusTreeNode> metadataIndexNodeQueue, TsFileOutput out)
+ throws IOException {
+ currentIndexNode.setEndOffset(out.getPosition());
+ metadataIndexNodeQueue.add(currentIndexNode);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/BPlusTreeNode.java
similarity index 79%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/BPlusTreeNode.java
index 3f6f633..5ea254d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/BPlusTreeNode.java
@@ -17,11 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.tsfile.file.metadata;
+package org.apache.iotdb.tsfile.file.metadata.metadataIndex;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -32,26 +31,21 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class MetadataIndexNode {
+public class BPlusTreeNode extends MetadataIndexNode {
private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
- private final List<MetadataIndexEntry> children;
- private long endOffset;
+ private boolean isLeaf;
- /** type of the child node at offset */
- private final MetadataIndexNodeType nodeType;
-
- public MetadataIndexNode(MetadataIndexNodeType nodeType) {
+ public BPlusTreeNode() {
children = new ArrayList<>();
endOffset = -1L;
- this.nodeType = nodeType;
+ isLeaf = false;
}
- public MetadataIndexNode(
- List<MetadataIndexEntry> children, long endOffset, MetadataIndexNodeType nodeType) {
+ public BPlusTreeNode(List<MetadataIndexEntry> children, long endOffset, boolean isLeaf) {
this.children = children;
this.endOffset = endOffset;
- this.nodeType = nodeType;
+ this.isLeaf = isLeaf;
}
public List<MetadataIndexEntry> getChildren() {
@@ -66,10 +60,6 @@ public class MetadataIndexNode {
this.endOffset = endOffset;
}
- public MetadataIndexNodeType getNodeType() {
- return nodeType;
- }
-
public void addEntry(MetadataIndexEntry metadataIndexEntry) {
this.children.add(metadataIndexEntry);
}
@@ -78,6 +68,14 @@ public class MetadataIndexNode {
return children.size() >= config.getMaxDegreeOfIndexNode();
}
+ public boolean isLeaf() {
+ return isLeaf;
+ }
+
+ public void setLeaf(boolean leaf) {
+ isLeaf = leaf;
+ }
+
MetadataIndexEntry peek() {
if (children.isEmpty()) {
return null;
@@ -92,20 +90,20 @@ public class MetadataIndexNode {
byteLen += metadataIndexEntry.serializeTo(outputStream);
}
byteLen += ReadWriteIOUtils.write(endOffset, outputStream);
- byteLen += ReadWriteIOUtils.write(nodeType.serialize(), outputStream);
+ byteLen += ReadWriteIOUtils.write(isLeaf, outputStream);
return byteLen;
}
- public static MetadataIndexNode deserializeFrom(ByteBuffer buffer) {
+ public static BPlusTreeNode deserializeFrom(ByteBuffer buffer) {
List<MetadataIndexEntry> children = new ArrayList<>();
int size = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
for (int i = 0; i < size; i++) {
children.add(MetadataIndexEntry.deserializeFrom(buffer));
}
long offset = ReadWriteIOUtils.readLong(buffer);
- MetadataIndexNodeType nodeType =
- MetadataIndexNodeType.deserialize(ReadWriteIOUtils.readByte(buffer));
- return new MetadataIndexNode(children, offset, nodeType);
+ boolean isLeaf = ReadWriteIOUtils.readBool(buffer);
+
+ return new BPlusTreeNode(children, offset, isLeaf);
}
public Pair<MetadataIndexEntry, Long> getChildIndexEntry(String key, boolean exactSearch) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexConstructor.java
similarity index 82%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexConstructor.java
index 062ffd6..1430a3d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexConstructor.java
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.tsfile.file.metadata;
+package org.apache.iotdb.tsfile.file.metadata.metadataIndex;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
@@ -44,11 +45,13 @@ public class MetadataIndexConstructor {
* Construct metadata index tree
*
* @param deviceTimeseriesMetadataMap device => TimeseriesMetadata list
- * @param out tsfile output
+ * @param tsFileOutput tsfile output
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static MetadataIndexNode constructMetadataIndex(
- Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap, TsFileOutput out)
+ Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap,
+ TsFileOutput tsFileOutput,
+ TsFileOutput indexFileOutput)
throws IOException {
Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
@@ -66,19 +69,23 @@ public class MetadataIndexConstructor {
timeseriesMetadata = entry.getValue().get(i);
if (i % config.getMaxDegreeOfIndexNode() == 0) {
if (currentIndexNode.isFull()) {
- addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ addCurrentIndexNodeToQueue(
+ currentIndexNode, measurementMetadataIndexQueue, tsFileOutput);
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
}
currentIndexNode.addEntry(
- new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
+ new MetadataIndexEntry(
+ timeseriesMetadata.getMeasurementId(), tsFileOutput.getPosition()));
}
- timeseriesMetadata.serializeTo(out.wrapAsStream());
+ timeseriesMetadata.serializeTo(tsFileOutput.wrapAsStream());
}
- addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, tsFileOutput);
deviceMetadataIndexMap.put(
entry.getKey(),
generateRootNode(
- measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+ measurementMetadataIndexQueue,
+ tsFileOutput,
+ MetadataIndexNodeType.INTERNAL_MEASUREMENT));
}
// if not exceed the max child nodes num, ignore the device index and directly point to the
@@ -87,10 +94,11 @@ public class MetadataIndexConstructor {
MetadataIndexNode metadataIndexNode =
new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
- metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
- entry.getValue().serializeTo(out.wrapAsStream());
+ metadataIndexNode.addEntry(
+ new MetadataIndexEntry(entry.getKey(), tsFileOutput.getPosition()));
+ entry.getValue().serializeTo(tsFileOutput.wrapAsStream());
}
- metadataIndexNode.setEndOffset(out.getPosition());
+ metadataIndexNode.setEndOffset(tsFileOutput.getPosition());
return metadataIndexNode;
}
@@ -101,16 +109,17 @@ public class MetadataIndexConstructor {
for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
// when constructing from internal node, each node is related to an entry
if (currentIndexNode.isFull()) {
- addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+ addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, tsFileOutput);
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
}
- currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
- entry.getValue().serializeTo(out.wrapAsStream());
+ currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), tsFileOutput.getPosition()));
+ entry.getValue().serializeTo(tsFileOutput.wrapAsStream());
}
- addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+ addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, tsFileOutput);
MetadataIndexNode deviceMetadataIndexNode =
- generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
- deviceMetadataIndexNode.setEndOffset(out.getPosition());
+ generateRootNode(
+ deviceMetadataIndexQueue, tsFileOutput, MetadataIndexNodeType.INTERNAL_DEVICE);
+ deviceMetadataIndexNode.setEndOffset(tsFileOutput.getPosition());
return deviceMetadataIndexNode;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexEntry.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexEntry.java
similarity index 96%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexEntry.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexEntry.java
index 5bf2cca..a444ec2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexEntry.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexEntry.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tsfile.file.metadata;
+package org.apache.iotdb.tsfile.file.metadata.metadataIndex;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexNode.java
similarity index 95%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexNode.java
index 3f6f633..8ec7e1e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexNode.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tsfile.file.metadata;
+package org.apache.iotdb.tsfile.file.metadata.metadataIndex;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -35,11 +35,11 @@ import java.util.List;
public class MetadataIndexNode {
private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
- private final List<MetadataIndexEntry> children;
- private long endOffset;
+ protected List<MetadataIndexEntry> children;
+ protected long endOffset;
+ private MetadataIndexNodeType nodeType;
- /** type of the child node at offset */
- private final MetadataIndexNodeType nodeType;
+ public MetadataIndexNode() {}
public MetadataIndexNode(MetadataIndexNodeType nodeType) {
children = new ArrayList<>();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexType.java
similarity index 61%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexType.java
index a179c1d..8c73326 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexType.java
@@ -16,20 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.v2.file.metadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+package org.apache.iotdb.tsfile.file.metadata.metadataIndex;
-import java.nio.ByteBuffer;
-
-public class MetadataIndexEntryV2 {
-
- private MetadataIndexEntryV2() {}
-
- public static MetadataIndexEntry deserializeFrom(ByteBuffer buffer) {
- String name = ReadWriteIOUtils.readString(buffer);
- long offset = ReadWriteIOUtils.readLong(buffer);
- return new MetadataIndexEntry(name, offset);
- }
+public enum MetadataIndexType {
+ ORIGIN,
+ B_PLUS_TREE
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index bfb8e31..98d6817 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
@@ -32,14 +33,16 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.BPlusTreeNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -91,6 +94,7 @@ public class TsFileSequenceReader implements AutoCloseable {
private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024;
protected String file;
protected TsFileInput tsFileInput;
+ protected TsFileInput indexFileInput;
protected long fileMetadataPos;
protected int fileMetadataSize;
private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
@@ -128,6 +132,9 @@ public class TsFileSequenceReader implements AutoCloseable {
}
this.file = file;
tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
+ if (FSFactoryProducer.getFSFactory().getFile(file + ".index").exists()) {
+ indexFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file + ".index");
+ }
try {
if (loadMetadataSize) {
loadMetadataSize();
@@ -262,9 +269,23 @@ public class TsFileSequenceReader implements AutoCloseable {
*/
public TsFileMetadata readFileMetadata() throws IOException {
try {
+// if (tsFileMetaData == null) {
+// tsFileMetaData =
+// TsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+// }
if (tsFileMetaData == null) {
+ ByteBuffer rootNodeOffsetBuffer = ByteBuffer.allocate(Long.BYTES);
+ indexFileInput.read(rootNodeOffsetBuffer, indexFileInput.size() - Long.BYTES);
+ rootNodeOffsetBuffer.flip();
+
+ long rootNodeOffset = ReadWriteIOUtils.readLong(rootNodeOffsetBuffer);
tsFileMetaData =
- TsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+ TsFileMetadata.deserializeFrom(
+ readData(
+ rootNodeOffset,
+ FSFactoryProducer.getFSFactory().getFile(this.file + ".index").length(),
+ indexFileInput
+ ));
}
} catch (BufferOverflowException e) {
logger.error("Something error happened while reading file metadata of file {}", file);
@@ -753,6 +774,48 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
+ private void generateMetadataIndex(
+ MetadataIndexEntry metadataIndex,
+ ByteBuffer buffer,
+ boolean isLeaf,
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap,
+ boolean needChunkMetadata)
+ throws IOException {
+ try {
+ if (isLeaf) {
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata));
+ }
+ timeseriesMetadataMap
+ .computeIfAbsent(
+ metadataIndex.getName().split(TsFileConstant.PATH_SEPARATER_NO_REGEX)[0],
+ k -> new ArrayList<>())
+ .addAll(timeseriesMetadataList);
+ } else {
+ BPlusTreeNode metadataIndexNode = (BPlusTreeNode) MetadataIndexNode.deserializeFrom(buffer);
+ int metadataIndexListSize = metadataIndexNode.getChildren().size();
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ }
+ ByteBuffer nextBuffer =
+ readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
+ generateMetadataIndex(
+ metadataIndexNode.getChildren().get(i),
+ nextBuffer,
+ metadataIndexNode.isLeaf(),
+ timeseriesMetadataMap,
+ needChunkMetadata);
+ }
+ }
+ } catch (BufferOverflowException e) {
+ logger.error("Something error happened while generating MetadataIndex of file {}", file);
+ throw e;
+ }
+ }
+
/* TimeseriesMetadata don't need deserialize chunk metadata list */
public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
if (tsFileMetaData == null) {
@@ -768,13 +831,43 @@ public class TsFileSequenceReader implements AutoCloseable {
endOffset = metadataIndexEntryList.get(i + 1).getOffset();
}
ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
+ if (config.getMetadataIndexType().equals(MetadataIndexType.ORIGIN)) {
+ generateMetadataIndex(
+ metadataIndexEntry,
+ buffer,
+ null,
+ metadataIndexNode.getNodeType(),
+ timeseriesMetadataMap,
+ false);
+ } else if (config.getMetadataIndexType().equals(MetadataIndexType.B_PLUS_TREE)) {
+ generateMetadataIndex(
+ metadataIndexEntry,
+ buffer,
+ ((BPlusTreeNode) metadataIndexNode).isLeaf(),
+ timeseriesMetadataMap,
+ false);
+ }
+ }
+ return timeseriesMetadataMap;
+ }
+
+ public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadataInBPlusTree()
+ throws IOException {
+ if (tsFileMetaData == null) {
+ readFileMetadata();
+ }
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>();
+ BPlusTreeNode metadataIndexNode = (BPlusTreeNode) tsFileMetaData.getMetadataIndex();
+ List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
+ for (int i = 0; i < metadataIndexEntryList.size(); i++) {
+ MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
+ long endOffset = tsFileMetaData.getMetadataIndex().getEndOffset();
+ if (i != metadataIndexEntryList.size() - 1) {
+ endOffset = metadataIndexEntryList.get(i + 1).getOffset();
+ }
+ ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
generateMetadataIndex(
- metadataIndexEntry,
- buffer,
- null,
- metadataIndexNode.getNodeType(),
- timeseriesMetadataMap,
- false);
+ metadataIndexEntry, buffer, metadataIndexNode.isLeaf(), timeseriesMetadataMap, false);
}
return timeseriesMetadataMap;
}
@@ -1070,6 +1163,31 @@ public class TsFileSequenceReader implements AutoCloseable {
return readData(start, (int) (end - start));
}
+ protected ByteBuffer readData(long start, long end, TsFileInput tsFileInput) throws IOException {
+ return readData(start, (int) (end - start), tsFileInput);
+ }
+
+ protected ByteBuffer readData(long position, int size, TsFileInput tsFileInput)
+ throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ if (position < 0) {
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+ throw new IOException("reach the end of the data");
+ }
+ } else {
+ long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size);
+ if (actualReadSize != size) {
+ throw new IOException(
+ String.format(
+ "reach the end of the data. Size of data that want to read: %s,"
+ + "actual read size: %s, position: %s",
+ size, actualReadSize, position));
+ }
+ }
+ buffer.flip();
+ return buffer;
+ }
+
/** notice, the target bytebuffer are not flipped. */
public int readRaw(long position, int length, ByteBuffer target) throws IOException {
return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position, length);
@@ -1397,6 +1515,10 @@ public class TsFileSequenceReader implements AutoCloseable {
return MetadataIndexNode.deserializeFrom(readData(startOffset, endOffset));
}
+ public BPlusTreeNode getBPlusTreeIndexNode(long startOffset, long endOffset) throws IOException {
+ return BPlusTreeNode.deserializeFrom(readData(startOffset, endOffset));
+ }
+
/**
* Check if the device has at least one Chunk in this partition
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
index a179c1d..90024d1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.tsfile.v2.file.metadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexEntry;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
index 11d0d1d..e3e46c3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.tsfile.v2.file.metadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index 49553e4..f554fb6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -22,9 +22,15 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -41,8 +47,15 @@ import org.apache.iotdb.tsfile.v2.file.metadata.TsFileMetadataV2;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.stream.Collectors;
public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements AutoCloseable {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index 83ee04e..f6cb00a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -46,7 +46,8 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
if (logger.isDebugEnabled()) {
logger.debug("{} writer is opened.", file.getName());
}
- this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
+ this.tsFileOutput =
+ FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
this.file = file;
// file doesn't exist
@@ -77,7 +78,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
}
public void doTruncate() throws IOException {
- out.truncate(truncatePosition);
+ tsFileOutput.truncate(truncatePosition);
}
public long getTruncatePosition() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 2ea48fa..169b29b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -79,7 +79,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
logger.debug("{} is opened.", file.getName());
}
this.file = file;
- this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
+ this.tsFileOutput =
+ FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
// file doesn't exist
if (file.length() == 0) {
@@ -98,16 +99,16 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
crashed = false;
canWrite = false;
- out.close();
+ tsFileOutput.close();
} else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
- out.close();
+ tsFileOutput.close();
throw new NotCompatibleTsFileException(
String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else {
crashed = true;
canWrite = true;
// remove broken data
- out.truncate(truncatedSize);
+ tsFileOutput.truncate(truncatedSize);
}
}
}
@@ -118,7 +119,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
logger.debug("{} is opened.", file.getName());
}
this.file = file;
- this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
+ this.tsFileOutput =
+ FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
// file doesn't exist
if (file.length() == 0) {
@@ -137,9 +139,9 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
crashed = false;
canWrite = false;
- out.close();
+ tsFileOutput.close();
} else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
- out.close();
+ tsFileOutput.close();
throw new NotCompatibleTsFileException(
String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else {
@@ -147,7 +149,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
canWrite = true;
// remove broken data
if (truncate) {
- out.truncate(truncatedSize);
+ tsFileOutput.truncate(truncatedSize);
}
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index b862890..565d79a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -23,10 +23,19 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.BPlusTreeConstructor;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.BPlusTreeNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexConstructor;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
@@ -41,7 +50,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
/**
* TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
@@ -59,7 +74,10 @@ public class TsFileIOWriter {
VERSION_NUMBER_BYTE = TSFileConfig.VERSION_NUMBER;
}
- protected TsFileOutput out;
+ // output of TsFile data area (.tsfile)
+ protected TsFileOutput tsFileOutput;
+ // output of TsFile index area (.tsfile.index)
+ protected TsFileOutput indexFileOutput;
protected boolean canWrite = true;
protected File file;
@@ -91,7 +109,10 @@ public class TsFileIOWriter {
* @throws IOException if I/O error occurs
*/
public TsFileIOWriter(File file) throws IOException {
- this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
+ this.tsFileOutput =
+ FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
+ this.indexFileOutput =
+ FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath() + ".index", false);
this.file = file;
if (resourceLogger.isDebugEnabled()) {
resourceLogger.debug("{} writer is opened.", file.getName());
@@ -105,15 +126,10 @@ public class TsFileIOWriter {
* @param output be used to output written data
*/
public TsFileIOWriter(TsFileOutput output) throws IOException {
- this.out = output;
+ this.tsFileOutput = output;
startFile();
}
- /** for test only */
- public TsFileIOWriter(TsFileOutput output, boolean test) {
- this.out = output;
- }
-
/**
* Writes given bytes to output stream. This method is called when total memory size exceeds the
* chunk group size threshold.
@@ -122,22 +138,22 @@ public class TsFileIOWriter {
* @throws IOException if an I/O error occurs.
*/
public void writeBytesToStream(PublicBAOS bytes) throws IOException {
- bytes.writeTo(out.wrapAsStream());
+ bytes.writeTo(tsFileOutput.wrapAsStream());
}
protected void startFile() throws IOException {
- out.write(MAGIC_STRING_BYTES);
- out.write(VERSION_NUMBER_BYTE);
+ tsFileOutput.write(MAGIC_STRING_BYTES);
+ tsFileOutput.write(VERSION_NUMBER_BYTE);
}
public void startChunkGroup(String deviceId) throws IOException {
this.currentChunkGroupDeviceId = deviceId;
if (logger.isDebugEnabled()) {
- logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
+ logger.debug("start chunk group:{}, file position {}", deviceId, tsFileOutput.getPosition());
}
chunkMetadataList = new ArrayList<>();
ChunkGroupHeader chunkGroupHeader = new ChunkGroupHeader(currentChunkGroupDeviceId);
- chunkGroupHeader.serializeTo(out.wrapAsStream());
+ chunkGroupHeader.serializeTo(tsFileOutput.wrapAsStream());
}
/**
@@ -151,7 +167,7 @@ public class TsFileIOWriter {
new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
currentChunkGroupDeviceId = null;
chunkMetadataList = null;
- out.flush();
+ tsFileOutput.flush();
}
/**
@@ -187,7 +203,7 @@ public class TsFileIOWriter {
throws IOException {
currentChunkMetadata =
- new ChunkMetadata(measurementId, tsDataType, out.getPosition(), statistics);
+ new ChunkMetadata(measurementId, tsDataType, tsFileOutput.getPosition(), statistics);
currentChunkMetadata.setMask((byte) mask);
ChunkHeader header =
@@ -199,7 +215,7 @@ public class TsFileIOWriter {
encodingType,
numOfPages,
mask);
- header.serializeTo(out.wrapAsStream());
+ header.serializeTo(tsFileOutput.wrapAsStream());
}
/** Write a whole chunk in another file into this file. Providing fast merge for IoTDB. */
@@ -209,10 +225,10 @@ public class TsFileIOWriter {
new ChunkMetadata(
chunkHeader.getMeasurementID(),
chunkHeader.getDataType(),
- out.getPosition(),
+ tsFileOutput.getPosition(),
chunkMetadata.getStatistics());
- chunkHeader.serializeTo(out.wrapAsStream());
- out.write(chunk.getData());
+ chunkHeader.serializeTo(tsFileOutput.wrapAsStream());
+ tsFileOutput.write(chunk.getData());
endCurrentChunk();
if (logger.isDebugEnabled()) {
logger.debug(
@@ -235,10 +251,10 @@ public class TsFileIOWriter {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void endFile() throws IOException {
- long metaOffset = out.getPosition();
+ long metaOffset = tsFileOutput.getPosition();
// serialize the SEPARATOR of MetaData
- ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+ ReadWriteIOUtils.write(MetaMarker.SEPARATOR, tsFileOutput.wrapAsStream());
// group ChunkMetadata by series
Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
@@ -251,36 +267,45 @@ public class TsFileIOWriter {
}
}
- MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();
- tsFileMetaData.setMetadataIndex(metadataIndex);
- tsFileMetaData.setMetaOffset(metaOffset);
+ if (config.getMetadataIndexType().equals(MetadataIndexType.ORIGIN)) {
+ MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
+ tsFileMetaData.setMetaOffset(metaOffset);
+ tsFileMetaData.setMetadataIndex(metadataIndex);
+ } else {
+ BPlusTreeNode bPlusTreeIndex = flushBTreeMetadataIndex(chunkMetadataListMap);
+ tsFileMetaData.setMetadataIndex(bPlusTreeIndex);
+ }
- long footerIndex = out.getPosition();
+ long footerIndex = tsFileOutput.getPosition();
if (logger.isDebugEnabled()) {
logger.debug("start to flush the footer,file pos:{}", footerIndex);
}
// write TsFileMetaData
- int size = tsFileMetaData.serializeTo(out.wrapAsStream());
+ int size = tsFileMetaData.serializeTo(indexFileOutput.wrapAsStream());
if (logger.isDebugEnabled()) {
- logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
+ logger.debug(
+ "finish flushing the footer {}, file pos:{}", tsFileMetaData, tsFileOutput.getPosition());
}
// write bloom filter
- size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
+ size +=
+ tsFileMetaData.serializeBloomFilter(
+ tsFileOutput.wrapAsStream(), chunkMetadataListMap.keySet());
if (logger.isDebugEnabled()) {
- logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
+ logger.debug("finish flushing the bloom filter file pos:{}", tsFileOutput.getPosition());
}
// write TsFileMetaData size
- ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
+ ReadWriteIOUtils.write(
+ size, tsFileOutput.wrapAsStream()); // write the size of the file metadata.
// write magic string
- out.write(MAGIC_STRING_BYTES);
+ tsFileOutput.write(MAGIC_STRING_BYTES);
// close file
- out.close();
+ tsFileOutput.close();
if (resourceLogger.isDebugEnabled() && file != null) {
resourceLogger.debug("{} writer is closed.", file.getName());
}
@@ -305,7 +330,30 @@ public class TsFileIOWriter {
}
// construct TsFileMetadata and return
- return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
+ return MetadataIndexConstructor.constructMetadataIndex(
+ deviceTimeseriesMetadataMap, tsFileOutput, indexFileOutput);
+ }
+
+ /**
+ * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
+ *
+ * @param chunkMetadataListMap chunkMetadata that Path.mask == 0
+ * @return MetadataIndexEntry list in TsFileMetadata
+ */
+ private BPlusTreeNode flushBTreeMetadataIndex(
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap) throws IOException {
+
+ // convert ChunkMetadataList to this field
+ deviceTimeseriesMetadataMap = new LinkedHashMap<>();
+ // create device -> TimeseriesMetaDataList Map
+ for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+ // for ordinary path
+ flushOneChunkMetadata(entry.getKey(), entry.getValue());
+ }
+
+ // construct TsFileMetadata and return
+ return BPlusTreeConstructor.constructMetadataIndex(
+ deviceTimeseriesMetadataMap, tsFileOutput, indexFileOutput);
}
/**
@@ -353,7 +401,7 @@ public class TsFileIOWriter {
* @throws IOException if I/O error occurs
*/
public long getPos() throws IOException {
- return out.getPosition();
+ return tsFileOutput.getPosition();
}
// device -> ChunkMetadataList
@@ -377,7 +425,7 @@ public class TsFileIOWriter {
}
public void reset() throws IOException {
- out.truncate(markedPosition);
+ tsFileOutput.truncate(markedPosition);
}
/**
@@ -386,15 +434,15 @@ public class TsFileIOWriter {
*/
public void close() throws IOException {
canWrite = false;
- out.close();
+ tsFileOutput.close();
}
void writeSeparatorMaskForTest() throws IOException {
- out.write(new byte[] {MetaMarker.SEPARATOR});
+ tsFileOutput.write(new byte[] {MetaMarker.SEPARATOR});
}
void writeChunkGroupMarkerForTest() throws IOException {
- out.write(new byte[] {MetaMarker.CHUNK_GROUP_HEADER});
+ tsFileOutput.write(new byte[] {MetaMarker.CHUNK_GROUP_HEADER});
}
public File getFile() {
@@ -440,10 +488,10 @@ public class TsFileIOWriter {
}
public void writePlanIndices() throws IOException {
- ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream());
- ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());
- ReadWriteIOUtils.write(maxPlanIndex, out.wrapAsStream());
- out.flush();
+ ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, tsFileOutput.wrapAsStream());
+ ReadWriteIOUtils.write(minPlanIndex, tsFileOutput.wrapAsStream());
+ ReadWriteIOUtils.write(maxPlanIndex, tsFileOutput.wrapAsStream());
+ tsFileOutput.flush();
}
/**
@@ -452,7 +500,7 @@ public class TsFileIOWriter {
* @return TsFileOutput
*/
public TsFileOutput getIOWriterOut() {
- return out;
+ return tsFileOutput;
}
/**
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNodeTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexNodeTest.java
similarity index 97%
rename from tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNodeTest.java
rename to tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexNodeTest.java
index 78302d8..570a64c 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNodeTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/metadataIndex/MetadataIndexNodeTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tsfile.file.metadata;
+package org.apache.iotdb.tsfile.file.metadata.metadataIndex;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
index 8363855..4ead55a 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
@@ -20,13 +20,13 @@ package org.apache.iotdb.tsfile.file.metadata.utils;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.header.PageHeaderTest;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
index dbd1fa2..a1ff540 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.tsfile.file.metadata.utils;
import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java
index 517f252..beafd0c 100755
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java
@@ -23,9 +23,14 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
@@ -42,13 +47,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
+import static org.junit.Assert.fail;
+
public class FileGenerator {
- private static final Logger LOG = LoggerFactory.getLogger(FileGenerator.class);
+ private static final Logger logger = LoggerFactory.getLogger(FileGenerator.class);
public static String outputDataFile =
TsFileGeneratorForTest.getTestTsFilePath("root.sg1", 0, 0, 0);
public static Schema schema;
private static int ROW_COUNT = 1000;
+ private static final String vectorPrefix = "vector_";
+
private static TsFileWriter innerWriter;
private static String inputDataFile;
private static String errorOutputDataFile;
@@ -216,7 +225,7 @@ public class FileGenerator {
Files.delete(file.toPath());
}
if (!file.getParentFile().mkdirs()) {
- LOG.info("Failed to create file folder {}", file.getParentFile());
+ logger.info("Failed to create file folder {}", file.getParentFile());
}
FileWriter fw = new FileWriter(file);
@@ -252,7 +261,7 @@ public class FileGenerator {
} catch (WriteProcessException e) {
e.printStackTrace();
}
- LOG.info("write to file successfully!!");
+ logger.info("write to file successfully!!");
}
private static void generateTestSchema() {
@@ -309,7 +318,7 @@ public class FileGenerator {
assert in != null;
while (in.hasNextLine()) {
if (lineCount % 1000000 == 0) {
- LOG.info("write line:{},use time:{}s", lineCount, (endTime - startTime) / 1000);
+ logger.info("write line:{},use time:{}s", lineCount, (endTime - startTime) / 1000);
}
String str = in.nextLine();
TSRecord record = RecordUtils.parseSimpleTupleRecord(str, schema);
@@ -317,7 +326,7 @@ public class FileGenerator {
lineCount++;
}
endTime = System.currentTimeMillis();
- LOG.info("write line:{},use time:{}s", lineCount, (endTime - startTime) / 1000);
+ logger.info("write line:{},use time:{}s", lineCount, (endTime - startTime) / 1000);
innerWriter.close();
in.close();
}
@@ -331,4 +340,115 @@ public class FileGenerator {
return null;
}
}
+
+ /**
+ * @param devices name and number of device
+ * @param vectorMeasurement the number of device and the number of values to include in the tablet
+ * @param singleMeasurement non-vector measurement name, set null if no need
+ */
+ public static void generateFile(
+ String filePath, String[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
+ File f = FSFactoryProducer.getFSFactory().getFile(filePath);
+ if (f.exists() && !f.delete()) {
+ fail("can not delete " + f.getAbsolutePath());
+ }
+ Schema schema = new Schema();
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {
+ // write single-variable timeseries
+ if (singleMeasurement != null) {
+ for (int i = 0; i < singleMeasurement.length; i++) {
+ String device = devices[i];
+ for (String measurement : singleMeasurement[i]) {
+ tsFileWriter.registerTimeseries(
+ new Path(device),
+ new UnaryMeasurementSchema(measurement, TSDataType.INT64, TSEncoding.RLE));
+ }
+ // the number of record rows
+ int rowNum = 10;
+ for (int row = 0; row < rowNum; row++) {
+ TSRecord tsRecord = new TSRecord(row, device);
+ for (String measurement : singleMeasurement[i]) {
+ DataPoint dPoint = new LongDataPoint(measurement, row);
+ tsRecord.addTuple(dPoint);
+ }
+ if (tsRecord.dataPointList.size() > 0) {
+ tsFileWriter.write(tsRecord);
+ }
+ }
+ }
+ }
+
+ // write multi-variable timeseries
+ for (int i = 0; i < devices.length; i++) {
+ String device = devices[i];
+ logger.info("generating device {}...", device);
+ // the number of rows to include in the tablet
+ int rowNum = 10;
+ for (int vectorIndex = 0; vectorIndex < vectorMeasurement[i].length; vectorIndex++) {
+ String vectorName =
+ vectorPrefix + generateIndexString(vectorIndex, vectorMeasurement.length);
+ logger.info("generating vector {}...", vectorName);
+ int measurementNum = vectorMeasurement[i][vectorIndex];
+ List<UnaryMeasurementSchema> schemas = new ArrayList<>();
+ List<IMeasurementSchema> tabletSchema = new ArrayList<>();
+ for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
+ String measurementName =
+ "sensor_" + generateIndexString(measurementIndex, measurementNum);
+ logger.info("generating vector measurement {}...", measurementName);
+ // add measurements into file schema (all with INT64 data type)
+ UnaryMeasurementSchema schema1 =
+ new UnaryMeasurementSchema(measurementName, TSDataType.INT64, TSEncoding.RLE);
+ schemas.add(schema1);
+ tabletSchema.add(schema1);
+ }
+ MeasurementGroup group = new MeasurementGroup(true, schemas);
+ schema.registerMeasurementGroup(new Path(device), group);
+ // add measurements into TSFileWriter
+ // construct the tablet
+ Tablet tablet = new Tablet(device, tabletSchema);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ long timestamp = 1;
+ long value = 1000000L;
+ for (int r = 0; r < rowNum; r++, value++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = timestamp++;
+ for (int j = 0; j < measurementNum; j++) {
+ long[] sensor = (long[]) values[j];
+ sensor[row] = value;
+ }
+ // write Tablet to TsFile
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.writeAligned(tablet);
+ tablet.reset();
+ }
+ }
+ // write Tablet to TsFile
+ if (tablet.rowSize != 0) {
+ tsFileWriter.writeAligned(tablet);
+ tablet.reset();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("meet error in TsFileWrite with tablet", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * generate curIndex string, use "0" on left to make sure align
+ *
+ * @param curIndex current index
+ * @param maxIndex max index
+ * @return curIndex's string
+ */
+ public static String generateIndexString(int curIndex, int maxIndex) {
+ StringBuilder res = new StringBuilder(String.valueOf(curIndex));
+ String target = String.valueOf(maxIndex);
+ while (res.length() < target.length()) {
+ res.insert(0, "0");
+ }
+ return res.toString();
+ }
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/BPlusTreeIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/BPlusTreeIndexConstructorTest.java
new file mode 100644
index 0000000..b84e1ff
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/BPlusTreeIndexConstructorTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.constant.TestConstant;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.BPlusTreeNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.FileGenerator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** test for BPlusTreeIndexConstructor */
+public class BPlusTreeIndexConstructorTest {
+ private final TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
+ private static final String FILE_PATH =
+ TestConstant.BASE_OUTPUT_PATH.concat("BPlusTreeIndexConstructorTest.tsfile");
+
+ private static final String measurementPrefix = "sensor_";
+ private static final String vectorPrefix = "vector_";
+ private int maxDegreeOfIndexNode;
+ private MetadataIndexType metadataIndexType;
+
+ @Before
+ public void before() {
+ maxDegreeOfIndexNode = conf.getMaxDegreeOfIndexNode();
+ conf.setMaxDegreeOfIndexNode(10);
+
+ metadataIndexType = conf.getMetadataIndexType();
+ conf.setMetadataIndexType(MetadataIndexType.B_PLUS_TREE);
+ }
+
+ @After
+ public void after() {
+ conf.setMaxDegreeOfIndexNode(maxDegreeOfIndexNode);
+ conf.setMetadataIndexType(metadataIndexType);
+ File file = new File(FILE_PATH);
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ /** Example 1: 5 entities with 5 measurements each */
+ @Test
+ public void singleIndexTest1() {
+ int deviceNum = 5;
+ int measurementNum = 5;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + i;
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /** Example 2: 1 entity with 150 measurements */
+ @Test
+ public void singleIndexTest2() {
+ int deviceNum = 1;
+ int measurementNum = 150;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + i;
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /** Example 3: 150 entities with 1 measurement each */
+ @Test
+ public void singleIndexTest3() {
+ int deviceNum = 150;
+ int measurementNum = 1;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + FileGenerator.generateIndexString(i, deviceNum);
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /** Example 4: 150 entities with 150 measurements each */
+ @Test
+ public void singleIndexTest4() {
+ int deviceNum = 150;
+ int measurementNum = 150;
+ String[] devices = new String[deviceNum];
+ int[][] vectorMeasurement = new int[deviceNum][];
+ String[][] singleMeasurement = new String[deviceNum][];
+ for (int i = 0; i < deviceNum; i++) {
+ devices[i] = "d" + FileGenerator.generateIndexString(i, deviceNum);
+ vectorMeasurement[i] = new int[0];
+ singleMeasurement[i] = new String[measurementNum];
+ for (int j = 0; j < measurementNum; j++) {
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
+ }
+ }
+ test(devices, vectorMeasurement, singleMeasurement);
+ }
+
+ /**
+ * start test
+ *
+ * @param devices name and number of device
+ * @param vectorMeasurement the number of device and the number of values to include in the tablet
+ * @param singleMeasurement non-vector measurement name, set null if no need
+ */
+ private void test(String[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
+ // 1. generate file
+ FileGenerator.generateFile(FILE_PATH, devices, vectorMeasurement, singleMeasurement);
+ // 2. read metadata from file
+ List<String> actualPaths = new ArrayList<>(); // contains all device by sequence
+ readMetaDataDFS(actualPaths);
+
+ List<String> actualDevices = new ArrayList<>(); // contains all device by sequence
+ List<List<String>> actualMeasurements = new ArrayList<>(); // contains all device by sequence
+
+ String lastDevice = null;
+ for (String path : actualPaths) {
+ String device = path.split(TsFileConstant.PATH_SEPARATER_NO_REGEX)[0];
+ String measurement = path.split(TsFileConstant.PATH_SEPARATER_NO_REGEX)[1];
+ if (!device.equals(lastDevice)) {
+ actualDevices.add(device);
+ List<String> measurements = new ArrayList<>();
+ measurements.add(measurement);
+ actualMeasurements.add(measurements);
+ } else {
+ actualMeasurements.get(actualMeasurements.size() - 1).add(measurement);
+ }
+ lastDevice = device;
+ }
+
+ // 3. generate correct result
+ List<String> correctDevices = new ArrayList<>(); // contains all device by sequence
+ List<List<String>> correctFirstMeasurements =
+ new ArrayList<>(); // contains first measurements of every leaf, group by device
+ generateCorrectResult(
+ correctDevices, correctFirstMeasurements, devices, vectorMeasurement, singleMeasurement);
+ // 4. compare correct result with TsFile's metadata
+ Arrays.sort(devices);
+ // 4.1 make sure device in order
+ assertEquals(correctDevices.size(), devices.length);
+ assertEquals(actualDevices.size(), correctDevices.size());
+ for (int i = 0; i < actualDevices.size(); i++) {
+ assertEquals(actualDevices.get(i), correctDevices.get(i));
+ }
+ // 4.2 make sure timeseries in order
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+ reader.getAllTimeseriesMetadata();
+ for (int j = 0; j < actualDevices.size(); j++) {
+ for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
+ assertEquals(
+ allTimeseriesMetadata.get(actualDevices.get(j)).get(i).getMeasurementId(),
+ correctFirstMeasurements.get(j).get(i));
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ // 4.3 make sure split leaf correctly
+ for (int j = 0; j < actualDevices.size(); j++) {
+ for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
+ assertEquals(
+ actualMeasurements.get(j).get(i),
+ correctFirstMeasurements.get(j).get(i * conf.getMaxDegreeOfIndexNode()));
+ }
+ }
+ }
+
+ /**
+ * read TsFile metadata, load actual message in devices and measurements
+ *
+ * @param paths load actual paths
+ */
+ private void readMetaDataDFS(List<String> paths) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ TsFileMetadata tsFileMetaData = reader.readFileMetadata();
+ BPlusTreeNode metadataIndexNode = (BPlusTreeNode) tsFileMetaData.getMetadataIndex();
+ deviceDFS(paths, reader, metadataIndexNode);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /** DFS in device level load actual devices */
+ private void deviceDFS(List<String> paths, TsFileSequenceReader reader, BPlusTreeNode node) {
+ try {
+ for (int i = 0; i < node.getChildren().size(); i++) {
+ MetadataIndexEntry metadataIndexEntry = node.getChildren().get(i);
+ long endOffset = node.getEndOffset();
+ if (i != node.getChildren().size() - 1) {
+ endOffset = node.getChildren().get(i + 1).getOffset();
+ }
+ BPlusTreeNode subNode =
+ reader.getBPlusTreeIndexNode(metadataIndexEntry.getOffset(), endOffset);
+ if (node.isLeaf()) {
+ paths.add(metadataIndexEntry.getName());
+ } else {
+ deviceDFS(paths, reader, subNode);
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * generate correct devices and measurements for test Note that if the metadata index tree is
+ * re-designed, you may need to modify this function as well.
+ *
+ * @param correctDevices output
+ * @param correctMeasurements output
+ * @param devices input
+ * @param vectorMeasurement input
+ * @param singleMeasurement input
+ */
+ private void generateCorrectResult(
+ List<String> correctDevices,
+ List<List<String>> correctMeasurements,
+ String[] devices,
+ int[][] vectorMeasurement,
+ String[][] singleMeasurement) {
+ for (int i = 0; i < devices.length; i++) {
+ String device = devices[i];
+ correctDevices.add(device);
+ // generate measurement and sort
+ List<String> measurements = new ArrayList<>();
+ // single-variable measurement
+ if (singleMeasurement != null) {
+ measurements.addAll(Arrays.asList(singleMeasurement[i]));
+ }
+ // multi-variable measurement
+ for (int vectorIndex = 0; vectorIndex < vectorMeasurement[i].length; vectorIndex++) {
+ String vectorName =
+ vectorPrefix + FileGenerator.generateIndexString(vectorIndex, vectorMeasurement.length);
+ measurements.add(vectorName);
+ int measurementNum = vectorMeasurement[i][vectorIndex];
+ for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
+ String measurementName =
+ measurementPrefix
+ + FileGenerator.generateIndexString(measurementIndex, measurementNum);
+ measurements.add(vectorName + TsFileConstant.PATH_SEPARATOR + measurementName);
+ }
+ }
+ Collections.sort(measurements);
+ correctMeasurements.add(measurements);
+ }
+ Collections.sort(correctDevices);
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
index ee73f8f..4878ce1 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
@@ -22,30 +22,18 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.constant.TestConstant;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.metadataIndex.MetadataIndexType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.MeasurementGroup;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.Schema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import org.apache.iotdb.tsfile.utils.FileGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -61,24 +49,27 @@ import static org.junit.Assert.fail;
/** test for MetadataIndexConstructor */
public class MetadataIndexConstructorTest {
- private static final Logger logger = LoggerFactory.getLogger(MetadataIndexConstructorTest.class);
private final TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
private static final String FILE_PATH =
TestConstant.BASE_OUTPUT_PATH.concat("MetadataIndexConstructorTest.tsfile");
private static final String measurementPrefix = "sensor_";
- private static final String vectorPrefix = "vector_";
private int maxDegreeOfIndexNode;
+ private MetadataIndexType metadataIndexType;
@Before
public void before() {
maxDegreeOfIndexNode = conf.getMaxDegreeOfIndexNode();
conf.setMaxDegreeOfIndexNode(10);
+
+ metadataIndexType = conf.getMetadataIndexType();
+ conf.setMetadataIndexType(MetadataIndexType.ORIGIN);
}
@After
public void after() {
conf.setMaxDegreeOfIndexNode(maxDegreeOfIndexNode);
+ conf.setMetadataIndexType(metadataIndexType);
File file = new File(FILE_PATH);
if (file.exists()) {
file.delete();
@@ -98,7 +89,8 @@ public class MetadataIndexConstructorTest {
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
- singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
@@ -117,7 +109,8 @@ public class MetadataIndexConstructorTest {
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
- singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
@@ -132,11 +125,12 @@ public class MetadataIndexConstructorTest {
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
- devices[i] = "d" + generateIndexString(i, deviceNum);
+ devices[i] = "d" + FileGenerator.generateIndexString(i, deviceNum);
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
- singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
@@ -155,7 +149,8 @@ public class MetadataIndexConstructorTest {
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
- singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
+ singleMeasurement[i][j] =
+ measurementPrefix + FileGenerator.generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
@@ -194,7 +189,7 @@ public class MetadataIndexConstructorTest {
*/
private void test(String[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
// 1. generate file
- generateFile(devices, vectorMeasurement, singleMeasurement);
+ FileGenerator.generateFile(FILE_PATH, devices, vectorMeasurement, singleMeasurement);
// 2. read metadata from file
List<String> actualDevices = new ArrayList<>(); // contains all device by sequence
List<List<String>> actualMeasurements =
@@ -349,7 +344,8 @@ public class MetadataIndexConstructorTest {
int measurementNum = vectorMeasurement[i][vectorIndex];
for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
String measurementName =
- measurementPrefix + generateIndexString(measurementIndex, measurementNum);
+ measurementPrefix
+ + FileGenerator.generateIndexString(measurementIndex, measurementNum);
measurements.add(TsFileConstant.PATH_SEPARATOR + measurementName);
}
}
@@ -358,115 +354,4 @@ public class MetadataIndexConstructorTest {
}
Collections.sort(correctDevices);
}
-
- /**
- * @param devices name and number of device
- * @param vectorMeasurement the number of device and the number of values to include in the tablet
- * @param singleMeasurement non-vector measurement name, set null if no need
- */
- private void generateFile(
- String[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
- File f = FSFactoryProducer.getFSFactory().getFile(FILE_PATH);
- if (f.exists() && !f.delete()) {
- fail("can not delete " + f.getAbsolutePath());
- }
- Schema schema = new Schema();
- try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {
- // write single-variable timeseries
- if (singleMeasurement != null) {
- for (int i = 0; i < singleMeasurement.length; i++) {
- String device = devices[i];
- for (String measurement : singleMeasurement[i]) {
- tsFileWriter.registerTimeseries(
- new Path(device),
- new UnaryMeasurementSchema(measurement, TSDataType.INT64, TSEncoding.RLE));
- }
- // the number of record rows
- int rowNum = 10;
- for (int row = 0; row < rowNum; row++) {
- TSRecord tsRecord = new TSRecord(row, device);
- for (String measurement : singleMeasurement[i]) {
- DataPoint dPoint = new LongDataPoint(measurement, row);
- tsRecord.addTuple(dPoint);
- }
- if (tsRecord.dataPointList.size() > 0) {
- tsFileWriter.write(tsRecord);
- }
- }
- }
- }
-
- // write multi-variable timeseries
- for (int i = 0; i < devices.length; i++) {
- String device = devices[i];
- logger.info("generating device {}...", device);
- // the number of rows to include in the tablet
- int rowNum = 10;
- for (int vectorIndex = 0; vectorIndex < vectorMeasurement[i].length; vectorIndex++) {
- String vectorName =
- vectorPrefix + generateIndexString(vectorIndex, vectorMeasurement.length);
- logger.info("generating vector {}...", vectorName);
- int measurementNum = vectorMeasurement[i][vectorIndex];
- List<UnaryMeasurementSchema> schemas = new ArrayList<>();
- List<IMeasurementSchema> tabletSchema = new ArrayList<>();
- for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
- String measurementName =
- measurementPrefix + generateIndexString(measurementIndex, measurementNum);
- logger.info("generating vector measurement {}...", measurementName);
- // add measurements into file schema (all with INT64 data type)
- UnaryMeasurementSchema schema1 =
- new UnaryMeasurementSchema(measurementName, TSDataType.INT64, TSEncoding.RLE);
- schemas.add(schema1);
- tabletSchema.add(schema1);
- }
- MeasurementGroup group = new MeasurementGroup(true, schemas);
- schema.registerMeasurementGroup(new Path(device), group);
- // add measurements into TSFileWriter
- // construct the tablet
- Tablet tablet = new Tablet(device, tabletSchema);
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
- long timestamp = 1;
- long value = 1000000L;
- for (int r = 0; r < rowNum; r++, value++) {
- int row = tablet.rowSize++;
- timestamps[row] = timestamp++;
- for (int j = 0; j < measurementNum; j++) {
- long[] sensor = (long[]) values[j];
- sensor[row] = value;
- }
- // write Tablet to TsFile
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- tsFileWriter.writeAligned(tablet);
- tablet.reset();
- }
- }
- // write Tablet to TsFile
- if (tablet.rowSize != 0) {
- tsFileWriter.writeAligned(tablet);
- tablet.reset();
- }
- }
- }
- } catch (Exception e) {
- logger.error("meet error in TsFileWrite with tablet", e);
- fail(e.getMessage());
- }
- }
-
- /**
- * generate curIndex string, use "0" on left to make sure align
- *
- * @param curIndex current index
- * @param maxIndex max index
- * @return curIndex's string
- */
- private String generateIndexString(int curIndex, int maxIndex) {
- StringBuilder res = new StringBuilder(String.valueOf(curIndex));
- String target = String.valueOf(maxIndex);
- while (res.length() < target.length()) {
- res.insert(0, "0");
- }
- return res.toString();
- }
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/AlignedChunkWriterImplTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/AlignedChunkWriterImplTest.java
index 2e756d3..980d7b5 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/AlignedChunkWriterImplTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/AlignedChunkWriterImplTest.java
@@ -60,7 +60,7 @@ public class AlignedChunkWriterImplTest {
try {
TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
- TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput);
chunkWriter.writeToFileWriter(writer);
PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
@@ -134,7 +134,7 @@ public class AlignedChunkWriterImplTest {
try {
TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
- TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput);
chunkWriter.writeToFileWriter(writer);
PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index e24b4d5..d63a5fa 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -105,7 +105,7 @@ public class RestorableTsFileIOWriterTest {
public void testOnlyFirstMask() throws Exception {
TsFileWriter writer = new TsFileWriter(file);
// we have to flush using inner API.
- writer.getIOWriter().out.write(new byte[] {MetaMarker.CHUNK_HEADER});
+ writer.getIOWriter().tsFileOutput.write(new byte[] {MetaMarker.CHUNK_HEADER});
writer.getIOWriter().close();
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
@@ -177,7 +177,7 @@ public class RestorableTsFileIOWriterTest {
writer.flushAllChunkGroups();
long pos2 = writer.getIOWriter().getPos();
// let's delete one byte. the version is broken
- writer.getIOWriter().out.truncate(pos2 - 1);
+ writer.getIOWriter().tsFileOutput.truncate(pos2 - 1);
writer.getIOWriter().close();
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
index bdca8d5..057b7f7 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
@@ -55,7 +55,7 @@ public class TimeChunkWriterTest {
try {
TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
- TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput);
chunkWriter.writeAllPagesOfChunkToTsFile(writer);
PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
@@ -92,7 +92,7 @@ public class TimeChunkWriterTest {
try {
TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
- TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput);
chunkWriter.writeAllPagesOfChunkToTsFile(writer);
PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java
index 3cc8272..84c29ce 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java
@@ -54,7 +54,7 @@ public class ValueChunkWriterTest {
try {
TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
- TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput);
chunkWriter.writeAllPagesOfChunkToTsFile(writer);
PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
@@ -87,10 +87,9 @@ public class ValueChunkWriterTest {
chunkWriter.sealCurrentPage();
// two pages with statistics size: (69 + 41) * 2 + chunk header size: 9
assertEquals(229L, chunkWriter.getCurrentChunkSize());
-
- TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
- TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput);
chunkWriter.writeAllPagesOfChunkToTsFile(writer);
PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());