You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/29 07:39:11 UTC
[incubator-iotdb] branch refactor_tsfile updated: add all writer V2
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch refactor_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_tsfile by this push:
new d6fc68b add all writer V2
d6fc68b is described below
commit d6fc68ba5471d79c70871eac7a6224bcbbf1e6d7
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Nov 29 15:38:52 2019 +0800
add all writer V2
---
.../tsfile/file/metadataV2/TsFileMetadataV2.java | 13 +-
.../apache/iotdb/tsfile/write/TsFileWriterV2.java | 282 +++++++++++++++
.../tsfile/write/chunk/ChunkWriterImplV2.java | 281 +++++++++++++++
.../iotdb/tsfile/write/page/PageWriterV2.java | 389 +++++++++++++++++++++
.../write/record/datapoint/BooleanDataPoint.java | 10 +
.../tsfile/write/record/datapoint/DataPoint.java | 3 +
.../write/record/datapoint/DoubleDataPoint.java | 10 +
.../write/record/datapoint/FloatDataPoint.java | 10 +
.../write/record/datapoint/IntDataPoint.java | 10 +
.../write/record/datapoint/LongDataPoint.java | 10 +
.../write/record/datapoint/StringDataPoint.java | 10 +
.../iotdb/tsfile/write/schemaV2/SchemaV2.java | 18 +-
.../tsfile/write/schemaV2/TimeseriesSchema.java | 67 ----
.../tsfile/write/writer/TsFileIOWriterV2.java | 371 ++++++++++++++++++++
14 files changed, 1404 insertions(+), 80 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java
index 13f0f44..8a55180 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java
@@ -33,21 +33,14 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BloomFilter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.schemaV2.SchemaV2;
/**
* TSFileMetaData collects all metadata info and saves in its data structure.
*/
public class TsFileMetadataV2 {
- private SchemaV2 schemaV2;
-
// fields below are IoTDB extensions and they does not affect TsFile's stand-alone functionality
private int totalChunkNum;
// invalid means a chunk has been rewritten by merge and the chunk's data is in
@@ -57,11 +50,7 @@ public class TsFileMetadataV2 {
// bloom filter
private BloomFilter bloomFilter;
- private List<ChunkMetaData> chunkMetaDataList;
-
- private TreeMap<Path, List<ChunkMetaData>> timeseriesMetadata;
-
- private List<Pair<Path, Long>> seriesMetadataIndex;
+ private long[] seriesMetadataIndex;
/**
* deserialize data from the buffer.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriterV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriterV2.java
new file mode 100644
index 0000000..1f1d9ae
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriterV2.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schemaV2.SchemaV2;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriterV2;
+import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TsFileWriter is the entrance for writing processing. It receives a record and send it to
+ * responding chunk group write. It checks memory size for all writing processing along its strategy
+ * and flush data stored in memory to OutputStream. At the end of writing, user should call {@code
+ * close()} method to flush the last data outside and close the normal outputStream and error
+ * outputStream.
+ */
+public class TsFileWriterV2 implements AutoCloseable{
+
+ private static final Logger LOG = LoggerFactory.getLogger(TsFileWriter.class);
+ protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
+ /**
+ * schema of this TsFile.
+ **/
+ protected final SchemaV2 schema;
+ /**
+ * IO writer of this TsFile.
+ **/
+ private final TsFileIOWriterV2 fileWriter;
+ private final int pageSize;
+ private long recordCount = 0;
+
+ private Map<Path, ChunkWriterImplV2> chunkWriters = new HashMap<>();
+
+ /**
+ * min value of threshold of data points num check.
+ **/
+ private long recordCountForNextMemCheck = 100;
+ private long chunkGroupSizeThreshold;
+
+ /**
+ * init this TsFileWriter.
+ *
+ * @param file the File to be written by this TsFileWriter
+ */
+ public TsFileWriterV2(File file) throws IOException {
+ this(new TsFileIOWriterV2(file), new SchemaV2(), TSFileDescriptor.getInstance().getConfig());
+ }
+
+ /**
+ * init this TsFileWriter.
+ *
+ * @param fileWriter the io writer of this TsFile
+ */
+ public TsFileWriterV2(TsFileIOWriterV2 fileWriter) throws IOException {
+ this(fileWriter, new SchemaV2(), TSFileDescriptor.getInstance().getConfig());
+ }
+
+ /**
+ * init this TsFileWriter.
+ *
+ * @param file the File to be written by this TsFileWriter
+ * @param schema the schema of this TsFile
+ */
+ public TsFileWriterV2(File file, SchemaV2 schema) throws IOException {
+ this(new TsFileIOWriterV2(file), schema, TSFileDescriptor.getInstance().getConfig());
+ }
+
+ /**
+ * init this TsFileWriter.
+ *
+ * @param output the TsFileOutput of the file to be written by this TsFileWriter
+ * @param schema the schema of this TsFile
+ * @throws IOException
+ */
+ public TsFileWriterV2(TsFileOutput output, SchemaV2 schema) throws IOException {
+ this(new TsFileIOWriterV2(output), schema, TSFileDescriptor.getInstance().getConfig());
+ }
+
+ /**
+ * init this TsFileWriter.
+ *
+ * @param file the File to be written by this TsFileWriter
+ * @param schema the schema of this TsFile
+ * @param conf the configuration of this TsFile
+ */
+ public TsFileWriterV2(File file, SchemaV2 schema, TSFileConfig conf) throws IOException {
+ this(new TsFileIOWriterV2(file), schema, conf);
+ }
+
+ /**
+ * init this TsFileWriter.
+ *
+ * @param fileWriter the io writer of this TsFile
+ * @param schema the schema of this TsFile
+ * @param conf the configuration of this TsFile
+ */
+ protected TsFileWriterV2(TsFileIOWriterV2 fileWriter, SchemaV2 schema, TSFileConfig conf)
+ throws IOException {
+ if (!fileWriter.canWrite()) {
+ throw new IOException(
+ "the given file Writer does not support writing any more. Maybe it is an complete TsFile");
+ }
+ this.fileWriter = fileWriter;
+ this.schema = schema;
+ this.pageSize = conf.getPageSizeInByte();
+ this.chunkGroupSizeThreshold = conf.getGroupSizeInByte();
+ config.setTSFileStorageFs(conf.getTSFileStorageFs().name());
+ if (this.pageSize >= chunkGroupSizeThreshold) {
+ LOG.warn(
+ "TsFile's page size {} is greater than chunk group size {}, please enlarge the chunk group"
+ + " size or decrease page size. ", pageSize, chunkGroupSizeThreshold);
+ }
+ }
+
+ /**
+ * add a measurementSchema to this TsFile.
+ */
+ public void addDeviceTemplates(Map<String, TimeseriesSchema> template) throws WriteProcessException {
+
+ }
+
+ /**
+ * Confirm whether the record is legal. If legal, add it into this RecordWriter.
+ *
+ * @param record - a record responding a line
+ * @return - whether the record has been added into RecordWriter legally
+ * @throws WriteProcessException exception
+ */
+ private boolean checkIsTimeSeriesExist(TSRecord record) throws WriteProcessException {
+ for (DataPoint dataPoint: record.dataPointList) {
+ Path path = new Path(record.deviceId, dataPoint.getMeasurementId());
+ if (chunkWriters.containsKey(path)) {
+ continue;
+ }
+
+ if (!schema.containsTimeseries(new Path(record.deviceId, dataPoint.getMeasurementId()))) {
+ throw new WriteProcessException("Time series not registered:" + record.deviceId + dataPoint.getMeasurementId());
+ } else {
+ chunkWriters.put(path, new ChunkWriterImplV2(schema.getSeriesSchema(path)));
+ }
+ }
+
+ return true;
+ }
+
+
+ /**
+ * write a record in type of T.
+ *
+ * @param record - record responding a data line
+ * @return true -size of tsfile or metadata reaches the threshold. false - otherwise
+ * @throws IOException exception in IO
+ * @throws WriteProcessException exception in write process
+ */
+ public boolean write(TSRecord record) throws IOException, WriteProcessException {
+ // make sure the ChunkGroupWriter for this TSRecord exist
+ checkIsTimeSeriesExist(record);
+ // get corresponding ChunkGroupWriter and write this TSRecord
+
+ for (DataPoint dataPoint: record.dataPointList) {
+ dataPoint.writeTo(record.time, chunkWriters.get(new Path(record.deviceId, dataPoint.getMeasurementId())));
+ }
+
+ ++recordCount;
+ return checkMemorySizeAndMayFlushChunks();
+ }
+
+
+ /**
+ * calculate total memory size occupied by all ChunkGroupWriter instances currently.
+ *
+ * @return total memory size used
+ */
+ private long calculateMemSizeForAllGroup() {
+ int memTotalSize = 0;
+ for (ChunkWriterImplV2 chunkWriter : chunkWriters.values()) {
+ memTotalSize += chunkWriter.estimateMaxSeriesMemSize();
+ }
+ return memTotalSize;
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the chunkGroupSize threshold, flush them to given
+ * OutputStream.
+ *
+ * @return true - size of tsfile or metadata reaches the threshold. false - otherwise
+ * @throws IOException exception in IO
+ */
+ private boolean checkMemorySizeAndMayFlushChunks() throws IOException {
+ if (recordCount >= recordCountForNextMemCheck) {
+ long memSize = calculateMemSizeForAllGroup();
+ assert memSize > 0;
+ if (memSize > chunkGroupSizeThreshold) {
+ LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize);
+ recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
+ return flushAllChunks();
+ } else {
+ recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * flush the data in all series writers of all chunk group writers and their page writers to
+ * outputStream.
+ *
+ * @return true - size of tsfile or metadata reaches the threshold. false - otherwise. But this
+ * function just return false, the Override of IoTDB may return true.
+ * @throws IOException exception in IO
+ */
+ private boolean flushAllChunks() throws IOException {
+ for (ChunkWriterImplV2 chunkWriter: chunkWriters.values()) {
+ chunkWriter.writeToFileWriter(fileWriter);
+ }
+ return false;
+ }
+
+
+ /**
+ * calling this method to write the last data remaining in memory and close the normal and error
+ * OutputStream.
+ *
+ * @throws IOException exception in IO
+ */
+ @Override
+ public void close() throws IOException {
+ LOG.info("start close file");
+ flushAllChunks();
+ fileWriter.endFile(this.schema);
+ }
+
+ /**
+ * this function is only for Test.
+ * @return TsFileIOWriter
+ */
+ public TsFileIOWriterV2 getIOWriter() {
+ return this.fileWriter;
+ }
+
+ /**
+ * this function is only for Test
+ * @throws IOException exception in IO
+ */
+ public void flushForTest() throws IOException {
+ flushAllChunks();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImplV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImplV2.java
new file mode 100644
index 0000000..5290240
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImplV2.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.write.page.PageWriter;
+import org.apache.iotdb.tsfile.write.page.PageWriterV2;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriterV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChunkWriterImplV2 {
+
+ private static final Logger logger = LoggerFactory.getLogger(ChunkWriterImpl.class);
+
+ private TimeseriesSchema timeseriesSchema;
+
+ private ICompressor compressor;
+
+ /**
+ * all pages of this column.
+ */
+ private PublicBAOS pageBuffer;
+
+ private int numOfPages;
+
+ /**
+ * write data into current page
+ */
+ private PageWriterV2 pageWriter;
+
+ /**
+ * page size threshold.
+ */
+ private final long pageSizeThreshold;
+
+ private final int maxNumberOfPointsInPage;
+
+ /**
+ * value count in current page.
+ */
+ private int valueCountInOnePageForNextCheck;
+
+ // initial value for valueCountInOnePageForNextCheck
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+
+ /**
+ * statistic of this chunk.
+ */
+ private Statistics<?> chunkStatistics;
+
+ /**
+ * @param schema schema of this measurement
+ */
+ public ChunkWriterImplV2(TimeseriesSchema schema) {
+ this.timeseriesSchema = schema;
+ this.compressor = ICompressor.getCompressor(schema.getCompressionType());
+ this.pageBuffer = new PublicBAOS();
+
+ this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxNumberOfPointsInPage = TSFileDescriptor.getInstance().getConfig()
+ .getMaxNumberOfPointsInPage();
+ // initial check of memory usage. So that we have enough data to make an initial prediction
+ this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+ // init statistics for this chunk and page
+ this.chunkStatistics = Statistics.getStatsByType(timeseriesSchema.getType());
+
+ this.pageWriter = new PageWriterV2(timeseriesSchema);
+ this.pageWriter.setTimeEncoder(timeseriesSchema.getTimeEncoder());
+ this.pageWriter.setValueEncoder(timeseriesSchema.getValueEncoder());
+ }
+
+ public void write(long time, long value) {
+ pageWriter.write(time, value);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long time, int value) {
+ pageWriter.write(time, value);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long time, boolean value) {
+ pageWriter.write(time, value);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long time, float value) {
+ pageWriter.write(time, value);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long time, double value) {
+ pageWriter.write(time, value);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long time, Binary value) {
+ pageWriter.write(time, value);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long[] timestamps, int[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long[] timestamps, long[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long[] timestamps, boolean[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long[] timestamps, float[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long[] timestamps, double[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ public void write(long[] timestamps, Binary[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ checkPageSizeAndMayOpenANewPage();
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the PageSize threshold, flush them to given
+ * OutputStream.
+ */
+ private void checkPageSizeAndMayOpenANewPage() {
+ if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
+ logger.debug("current line count reaches the upper bound, write page {}", timeseriesSchema);
+ writePage();
+ } else if (pageWriter.getPointNumber()
+ >= valueCountInOnePageForNextCheck) { // need to check memory size
+ // not checking the memory used for every value
+ long currentPageSize = pageWriter.estimateMaxMemSize();
+ if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
+ // we will write the current page
+ logger.debug(
+ "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+ timeseriesSchema.getMeasurementId(), pageSizeThreshold, currentPageSize,
+ pageWriter.getPointNumber());
+ writePage();
+ valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ } else {
+ // reset the valueCountInOnePageForNextCheck for the next page
+ valueCountInOnePageForNextCheck = (int) (((float) pageSizeThreshold / currentPageSize)
+ * pageWriter.getPointNumber());
+ }
+ }
+ }
+
+ private void writePage() {
+ try {
+ pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer);
+
+ // update statistics of this chunk
+ numOfPages++;
+ this.chunkStatistics.mergeStatistics(pageWriter.getStatistics());
+ } catch (IOException e) {
+ logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+ } finally {
+ // clear start time stamp for next initializing
+ pageWriter.reset(timeseriesSchema);
+ }
+ }
+
+ public void writeToFileWriter(TsFileIOWriterV2 tsfileWriter) throws IOException {
+ // seal current page
+ if (pageWriter.getPointNumber() > 0) {
+ writePage();
+ }
+
+ writeAllPagesOfChunkToTsFile(tsfileWriter, chunkStatistics);
+
+ // reinit this chunk writer
+ pageBuffer.reset();
+ this.chunkStatistics = Statistics.getStatsByType(timeseriesSchema.getType());
+ }
+
+ public long estimateMaxSeriesMemSize() {
+ return pageWriter.estimateMaxMemSize() + this.estimateMaxPageMemSize();
+ }
+
+ public long getCurrentChunkSize() {
+ // return the serialized size of the chunk header + all pages
+ return ChunkHeader.getSerializedSize(timeseriesSchema.getMeasurementId()) + this
+ .getCurrentDataSize();
+ }
+
+ public int getNumOfPages() {
+ return numOfPages;
+ }
+
+ public TSDataType getDataType() {
+ return timeseriesSchema.getType();
+ }
+
+ /**
+ * write the page to specified IOWriter.
+ *
+ * @param writer the specified IOWriter
+ * @param statistics the chunk statistics
+ * @throws IOException exception in IO
+ */
+ public void writeAllPagesOfChunkToTsFile(TsFileIOWriterV2 writer, Statistics<?> statistics)
+ throws IOException {
+ if (statistics.getCount() == 0) {
+ return;
+ }
+
+ // start to write this column chunk
+ writer.startFlushChunk(timeseriesSchema, compressor.getType(), timeseriesSchema.getType(),
+ timeseriesSchema.getEncodingType(), statistics, pageBuffer.size(), numOfPages);
+
+ // write all pages of this column
+ writer.writeBytesToStream(pageBuffer);
+
+ writer.endCurrentChunk();
+ }
+
+ /**
+ * estimate max page memory size.
+ *
+ * @return the max possible allocated size currently
+ */
+ private long estimateMaxPageMemSize() {
+ // return the sum of size of buffer and page max size
+ return (long) (pageBuffer.size() +
+ PageHeader.calculatePageHeaderSizeWithoutStatistics() +
+ pageWriter.getStatistics().getSerializedSize());
+ }
+
+ /**
+ * get current data size.
+ *
+ * @return current data size that the writer has serialized.
+ */
+ private long getCurrentDataSize() {
+ return pageBuffer.size();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriterV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriterV2.java
new file mode 100644
index 0000000..fdacd6c
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriterV2.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.page;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This writer is used to write time-value into a page. It consists of a time encoder,
+ * a value encoder and respective OutputStream.
+ */
+public class PageWriterV2 {
+
+ private static final Logger logger = LoggerFactory.getLogger(PageWriter.class);
+
+ // time of the latest written time value pair, we assume data is written in time order
+ private long pageMaxTime;
+ private long pageMinTime = Long.MIN_VALUE;
+
+ private ICompressor compressor;
+
+ // time
+ private Encoder timeEncoder;
+ private PublicBAOS timeOut;
+ // value
+ private Encoder valueEncoder;
+ private PublicBAOS valueOut;
+
+ /**
+ * statistic of current page. It will be reset after calling {@code writePageHeaderAndDataIntoBuff()}
+ */
+ private Statistics<?> statistics;
+ private int pointNumber;
+
+ public PageWriterV2() {
+ this(null, null);
+ }
+
+ public PageWriterV2(TimeseriesSchema timeseriesSchema) {
+ this(timeseriesSchema.getTimeEncoder(), timeseriesSchema.getValueEncoder());
+ this.statistics = Statistics.getStatsByType(timeseriesSchema.getType());
+ this.compressor = ICompressor.getCompressor(timeseriesSchema.getCompressionType());
+ }
+
+ private PageWriterV2(Encoder timeEncoder, Encoder valueEncoder) {
+ this.timeOut = new PublicBAOS();
+ this.valueOut = new PublicBAOS();
+ this.timeEncoder = timeEncoder;
+ this.valueEncoder = valueEncoder;
+ }
+
+ /**
+ * write a time value pair into encoder
+ */
+ public void write(long time, boolean value) {
+ ++pointNumber;
+ this.pageMaxTime = time;
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = time;
+ }
+ timeEncoder.encode(time, timeOut);
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+
+ /**
+ * write a time value pair into encoder
+ */
+ public void write(long time, short value) {
+ ++pointNumber;
+ timeEncoder.encode(time, timeOut);
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+
+ /**
+ * write a time value pair into encoder
+ */
+ public void write(long time, int value) {
+ ++pointNumber;
+ this.pageMaxTime = time;
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = time;
+ }
+ timeEncoder.encode(time, timeOut);
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+
+ /**
+ * write a time value pair into encoder
+ */
+ public void write(long time, long value) {
+ ++pointNumber;
+ this.pageMaxTime = time;
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = time;
+ }
+ timeEncoder.encode(time, timeOut);
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+
+ /**
+ * write a time value pair into encoder
+ */
+ public void write(long time, float value) {
+ ++pointNumber;
+ this.pageMaxTime = time;
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = time;
+ }
+ timeEncoder.encode(time, timeOut);
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+
+ /**
+ * write a time value pair into encoder
+ */
+ public void write(long time, double value) {
+ ++pointNumber;
+ this.pageMaxTime = time;
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = time;
+ }
+ timeEncoder.encode(time, timeOut);
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+
+ /**
+ * write a time value pair into encoder
+ */
+ public void write(long time, Binary value) {
+ ++pointNumber;
+ this.pageMaxTime = time;
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = time;
+ }
+ timeEncoder.encode(time, timeOut);
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+
+ /**
+ * write time series into encoder
+ */
+ public void write(long[] timestamps, boolean[] values, int batchSize) {
+ pointNumber += batchSize;
+ this.pageMaxTime = timestamps[batchSize - 1];
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = timestamps[0];
+ }
+ for (int i = 0; i < batchSize; i++) {
+ timeEncoder.encode(timestamps[i], timeOut);
+ valueEncoder.encode(values[i], valueOut);
+ }
+ statistics.update(timestamps, values, batchSize);
+ }
+
+ /**
+ * write time series into encoder
+ */
+ public void write(long[] timestamps, int[] values, int batchSize) {
+ pointNumber += batchSize;
+ this.pageMaxTime = timestamps[batchSize - 1];
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = timestamps[0];
+ }
+ for (int i = 0; i < batchSize; i++) {
+ timeEncoder.encode(timestamps[i], timeOut);
+ valueEncoder.encode(values[i], valueOut);
+ }
+ statistics.update(timestamps, values, batchSize);
+ }
+
+ /**
+ * write time series into encoder
+ */
+ public void write(long[] timestamps, long[] values, int batchSize) {
+ pointNumber += batchSize;
+ this.pageMaxTime = timestamps[batchSize - 1];
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = timestamps[0];
+ }
+ for (int i = 0; i < batchSize; i++) {
+ timeEncoder.encode(timestamps[i], timeOut);
+ valueEncoder.encode(values[i], valueOut);
+ }
+ statistics.update(timestamps, values, batchSize);
+ }
+
+ /**
+ * write time series into encoder
+ */
+ public void write(long[] timestamps, float[] values, int batchSize) {
+ pointNumber += batchSize;
+ this.pageMaxTime = timestamps[batchSize - 1];
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = timestamps[0];
+ }
+ for (int i = 0; i < batchSize; i++) {
+ timeEncoder.encode(timestamps[i], timeOut);
+ valueEncoder.encode(values[i], valueOut);
+ }
+ statistics.update(timestamps, values, batchSize);
+ }
+
+ /**
+ * write time series into encoder
+ */
+ public void write(long[] timestamps, double[] values, int batchSize) {
+ pointNumber += batchSize;
+ this.pageMaxTime = timestamps[batchSize - 1];
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = timestamps[0];
+ }
+ for (int i = 0; i < batchSize; i++) {
+ timeEncoder.encode(timestamps[i], timeOut);
+ valueEncoder.encode(values[i], valueOut);
+ }
+ statistics.update(timestamps, values, batchSize);
+ }
+
+ /**
+ * write time series into encoder
+ */
+ public void write(long[] timestamps, Binary[] values, int batchSize) {
+ pointNumber += batchSize;
+ this.pageMaxTime = timestamps[batchSize - 1];
+ if (pageMinTime == Long.MIN_VALUE) {
+ pageMinTime = timestamps[0];
+ }
+ for (int i = 0; i < batchSize; i++) {
+ timeEncoder.encode(timestamps[i], timeOut);
+ valueEncoder.encode(values[i], valueOut);
+ }
+ statistics.update(timestamps, values, batchSize);
+ }
+
+ /**
+ * flush all data remained in encoders.
+ */
+ private void prepareEndWriteOnePage() throws IOException {
+ timeEncoder.flush(timeOut);
+ valueEncoder.flush(valueOut);
+ }
+
+ /**
+ * getUncompressedBytes return data what it has been written in form of
+ * <code>size of time list, time list, value list</code>
+ *
+ * @return a new readable ByteBuffer whose position is 0.
+ */
+ public ByteBuffer getUncompressedBytes() throws IOException {
+ prepareEndWriteOnePage();
+ ByteBuffer buffer = ByteBuffer.allocate(timeOut.size() + valueOut.size() + 4);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(timeOut.size(), buffer);
+ buffer.put(timeOut.getBuf(), 0, timeOut.size());
+ buffer.put(valueOut.getBuf(), 0, valueOut.size());
+ buffer.flip();
+ return buffer;
+ }
+
+ public long getPageMaxTime() {
+ return pageMaxTime;
+ }
+
+ public long getPageMinTime() {
+ return pageMinTime;
+ }
+
+ /**
+ * write the page header and data into the PageWriter's output stream.
+ */
+ public void writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer) throws IOException {
+ if (pointNumber == 0) {
+ return;
+ }
+
+ ByteBuffer pageData = getUncompressedBytes();
+ int uncompressedSize = pageData.remaining();
+ int compressedSize;
+ int compressedPosition = 0;
+ byte[] compressedBytes = null;
+
+ if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+ compressedSize = pageData.remaining();
+ } else {
+ compressedBytes = new byte[compressor.getMaxBytesForCompression(uncompressedSize)];
+ compressedPosition = 0;
+ // data is never a directByteBuffer now, so we can use data.array()
+ compressedSize = compressor
+ .compress(pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
+ }
+
+ // write the page header to IOWriter
+ PageHeader header = new PageHeader(uncompressedSize, compressedSize, pointNumber, statistics,
+ pageMaxTime, pageMinTime);
+ header.serializeTo(pageBuffer);
+
+ // write page content to temp PBAOS
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+ logger.debug("start to flush a page data into buffer, buffer position {} ", pageBuffer.size());
+ if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+ channel.write(pageData);
+ } else {
+ pageBuffer.write(compressedBytes, compressedPosition, compressedSize);
+ }
+ logger.debug("start to flush a page data into buffer, buffer position {} ", pageBuffer.size());
+ }
+ }
+
+ /**
+ * calculate max possible memory size it occupies, including time outputStream and value outputStream, because size
+ * outputStream is never used until flushing.
+ *
+ * @return allocated size in time, value and outputStream
+ */
+ public long estimateMaxMemSize() {
+ return timeOut.size() + valueOut.size() + timeEncoder.getMaxByteSize() + valueEncoder
+ .getMaxByteSize();
+ }
+
+ /**
+ * reset this page
+ */
+ public void reset(TimeseriesSchema measurementSchema) {
+ timeOut.reset();
+ valueOut.reset();
+ pointNumber =0;
+ pageMinTime = Long.MIN_VALUE;
+ pageMaxTime = Long.MIN_VALUE;
+ statistics = Statistics.getStatsByType(measurementSchema.getType());
+ }
+
+ public void setTimeEncoder(Encoder encoder) {
+ this.timeEncoder = encoder;
+ }
+
+ public void setValueEncoder(Encoder encoder) {
+ this.valueEncoder = encoder;
+ }
+
+ public void initStatistics(TSDataType dataType) {
+ statistics = Statistics.getStatsByType(dataType);
+ }
+
+ public int getPointNumber(){
+ return pointNumber;
+ }
+
+ public Statistics<?> getStatistics(){
+ return statistics;
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
index 7fd5c19..21a5465 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,15 @@ public class BooleanDataPoint extends DataPoint {
}
@Override
+ public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+ if (writer == null) {
+ LOG.warn("given IChunkWriter is null, do nothing and return");
+ return;
+ }
+ writer.write(time, value);
+ }
+
+ @Override
public Object getValue() {
return value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
index 0d0f057..5ac7286 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.StringContainer;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import java.io.IOException;
@@ -107,6 +108,8 @@ public abstract class DataPoint {
*/
public abstract void writeTo(long time, IChunkWriter writer) throws IOException;
+ public abstract void writeTo(long time, ChunkWriterImplV2 writer) throws IOException;
+
public String getMeasurementId() {
return measurementId;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
index de6eb0f..dbeb756 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,15 @@ public class DoubleDataPoint extends DataPoint {
}
@Override
+ public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+ if (writer == null) {
+ LOG.warn("given IChunkWriter is null, do nothing and return");
+ return;
+ }
+ writer.write(time, value);
+ }
+
+ @Override
public Object getValue() {
return value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
index af3d919..d613f20 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,15 @@ public class FloatDataPoint extends DataPoint {
}
@Override
+ public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+ if (writer == null) {
+ LOG.warn("given IChunkWriter is null, do nothing and return");
+ return;
+ }
+ writer.write(time, value);
+ }
+
+ @Override
public Object getValue() {
return value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
index 38cbfd0..8e695bf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,15 @@ public class IntDataPoint extends DataPoint {
}
@Override
+ public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+ if (writer == null) {
+ LOG.warn("given IChunkWriter is null, do nothing and return");
+ return;
+ }
+ writer.write(time, value);
+ }
+
+ @Override
public Object getValue() {
return value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
index 2258816..fedd6cc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,15 @@ public class LongDataPoint extends DataPoint {
}
@Override
+ public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+ if (writer == null) {
+ LOG.warn("given IChunkWriter is null, do nothing and return");
+ return;
+ }
+ writer.write(time, value);
+ }
+
+ @Override
public Object getValue() {
return value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
index 047a724..c741a10 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,15 @@ public class StringDataPoint extends DataPoint {
}
@Override
+ public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+ if (writer == null) {
+ LOG.warn("given IChunkWriter is null, do nothing and return");
+ return;
+ }
+ writer.write(time, value);
+ }
+
+ @Override
public Object getValue() {
return value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java
index 60c3d0b..8a89ab9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.write.schemaV2;
+import java.util.LinkedHashMap;
import org.apache.iotdb.tsfile.read.common.Path;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,13 @@ public class SchemaV2 {
private Map<Path, TimeseriesSchema> timeseriesSchemaMap;
/**
+ * template name -> (measuremnet -> TimeseriesSchema)
+ */
+ private Map<String, Map<String, TimeseriesSchema>> deviceTemplates;
+
+ private Map<String> devices;
+
+ /**
* register a measurement schema map.
*/
public void registerTimeseries(Path path, TimeseriesSchema descriptor) {
@@ -57,10 +65,18 @@ public class SchemaV2 {
return timeseriesSchemaMap.get(path);
}
+ public boolean containsDevice(String device) {
+ return devices.containsKey(device);
+ }
+
+ public Map<Path, TimeseriesSchema> getTimeseriesSchemaMap() {
+ return timeseriesSchemaMap;
+ }
+
/**
* check if this schema contains a measurement named measurementId.
*/
- public boolean hasTimeseries(Path path) {
+ public boolean containsTimeseries(Path path) {
return timeseriesSchemaMap.containsKey(path);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java
index 67b3559..a4a01a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java
@@ -163,71 +163,4 @@ public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializa
return compressionType;
}
- /**
- * function for serializing data to output stream.
- */
- public int serializeTo(OutputStream outputStream) throws IOException {
- int byteLen = 0;
-
- byteLen += ReadWriteIOUtils.write(measurementId, outputStream);
-
- byteLen += ReadWriteIOUtils.write(type, outputStream);
-
- byteLen += ReadWriteIOUtils.write(encoding, outputStream);
-
- byteLen += ReadWriteIOUtils.write(compressionType, outputStream);
-
- if (props == null) {
- byteLen += ReadWriteIOUtils.write(0, outputStream);
- } else {
- byteLen += ReadWriteIOUtils.write(props.size(), outputStream);
- for (Map.Entry<String, String> entry : props.entrySet()) {
- byteLen += ReadWriteIOUtils.write(entry.getKey(), outputStream);
- byteLen += ReadWriteIOUtils.write(entry.getValue(), outputStream);
- }
- }
-
- return byteLen;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TimeseriesSchema that = (TimeseriesSchema) o;
- return type == that.type && encoding == that.encoding && Objects
- .equals(measurementId, that.measurementId)
- && Objects.equals(compressionType, that.compressionType);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(type, encoding, measurementId, compressionType);
- }
-
- /**
- * compare by measurementID.
- */
- @Override
- public int compareTo(TimeseriesSchema o) {
- if (equals(o)) {
- return 0;
- } else {
- return this.measurementId.compareTo(o.measurementId);
- }
- }
-
- @Override
- public String toString() {
- StringContainer sc = new StringContainer("");
- sc.addTail("[", measurementId, ",", type.toString(), ",", encoding.toString(), ",",
- props.toString(), ",",
- compressionType.toString());
- sc.addTail("]");
- return sc.toString();
- }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterV2.java
new file mode 100644
index 0000000..56db877
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterV2.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.file.metadataV2.TsFileMetadataV2;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schemaV2.SchemaV2;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TSFileIOWriter is used to construct metadata and write data stored in memory to output stream.
+ */
+public class TsFileIOWriterV2 {
+
+ public static final byte[] magicStringBytes;
+ public static final byte[] versionNumberBytes;
+ protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+ private static final Logger logger = LoggerFactory.getLogger(TsFileIOWriter.class);
+
+ static {
+ magicStringBytes = BytesUtils.stringToBytes(TSFileConfig.MAGIC_STRING);
+ versionNumberBytes = TSFileConfig.VERSION_NUMBER.getBytes();
+ }
+
+ protected TsFileOutput out;
+ protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new ArrayList<>();
+ protected boolean canWrite = true;
+ protected int totalChunkNum = 0;
+ protected int invalidChunkNum;
+ protected File file;
+ private List<ChunkMetaData> chunkMetaDataList;
+ private ChunkMetaData currentChunkMetaData;
+ private long markedPosition;
+
+ /**
+ * empty construct function.
+ */
+ protected TsFileIOWriterV2() {
+
+ }
+
+ /**
+ * for writing a new tsfile.
+ *
+ * @param file be used to output written data
+ * @throws IOException if I/O error occurs
+ */
+ public TsFileIOWriterV2(File file) throws IOException {
+ this.out = new DefaultTsFileOutput(file);
+ startFile();
+ }
+
+ /**
+ * for writing a new tsfile.
+ *
+ * @param output be used to output written data
+ */
+ public TsFileIOWriterV2(TsFileOutput output) throws IOException {
+ this.out = output;
+ startFile();
+ }
+
+
+ /**
+ * Writes given bytes to output stream. This method is called when total memory size exceeds the
+ * chunk group size threshold.
+ *
+ * @param bytes - data of several pages which has been packed
+ * @throws IOException if an I/O error occurs.
+ */
+ public void writeBytesToStream(PublicBAOS bytes) throws IOException {
+ bytes.writeTo(out.wrapAsStream());
+ }
+
+ protected void startFile() throws IOException {
+ out.write(magicStringBytes);
+ out.write(versionNumberBytes);
+ }
+
+ /**
+ * start a {@linkplain ChunkMetaData ChunkMetaData}.
+ *
+ * @param timeseriesSchema - schema of this time series
+ * @param compressionCodecName - compression name of this time series
+ * @param tsDataType - data type
+ * @param statistics - Chunk statistics
+ * @param dataSize - the serialized size of all pages
+ * @throws IOException if I/O error occurs
+ */
+ public void startFlushChunk(TimeseriesSchema timeseriesSchema, CompressionType compressionCodecName,
+ TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics,
+ int dataSize, int numOfPages) throws IOException {
+
+ currentChunkMetaData = new ChunkMetaData(timeseriesSchema.getMeasurementId(), tsDataType,
+ out.getPosition(), statistics);
+
+ // flush ChunkHeader to TsFileIOWriter
+ if (logger.isDebugEnabled()) {
+ logger.debug("start series chunk:{}, file position {}", timeseriesSchema, out.getPosition());
+ }
+
+ ChunkHeader header = new ChunkHeader(timeseriesSchema.getMeasurementId(), dataSize, tsDataType,
+ compressionCodecName, encodingType, numOfPages);
+ header.serializeTo(out.wrapAsStream());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("finish series chunk:{} header, file position {}", header, out.getPosition());
+ }
+ }
+
+ /**
+ * Write a whole chunk in another file into this file. Providing fast merge for IoTDB.
+ */
+ public void writeChunk(Chunk chunk, ChunkMetaData chunkMetadata) throws IOException {
+ ChunkHeader chunkHeader = chunk.getHeader();
+ currentChunkMetaData = new ChunkMetaData(chunkHeader.getMeasurementID(),
+ chunkHeader.getDataType(), out.getPosition(), chunkMetadata.getStatistics());
+ chunkHeader.serializeTo(out.wrapAsStream());
+ out.write(chunk.getData());
+ endCurrentChunk();
+ logger.debug("end flushing a chunk:{}, totalvalue:{}", currentChunkMetaData, chunkMetadata.getNumOfPoints());
+ }
+
+ /**
+ * end chunk and write some log.
+ */
+ public void endCurrentChunk() {
+ chunkMetaDataList.add(currentChunkMetaData);
+ currentChunkMetaData = null;
+ totalChunkNum++;
+ }
+
+ /**
+ * write {@linkplain TsFileMetaData TSFileMetaData} to output stream and close it.
+ *
+ * @param schema Schema
+ * @throws IOException if I/O error occurs
+ */
+ public void endFile(SchemaV2 schema) throws IOException {
+
+ // serialize the SEPARATOR of MetaData and ChunkGroups
+ ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
+ // get all measurementSchema of this TsFile
+ Map<Path, TimeseriesSchema> schemaDescriptors = schema.getTimeseriesSchemaMap();
+ logger.debug("get time series list:{}", schemaDescriptors);
+
+ long[] tsOffsets = flushAllChunkMetadataList(this.chunkMetaDataList);
+
+ TsFileMetadataV2 tsFileMetaData = new TsFileMetadataV2(tsOffsets);
+
+ tsFileMetaData.setTotalChunkNum(totalChunkNum);
+ tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
+
+ long footerIndex = out.getPosition();
+ logger.debug("start to flush the footer,file pos:{}", footerIndex);
+
+ // write TsFileMetaData
+ int size = tsFileMetaData.serializeTo(out.wrapAsStream());
+ if (logger.isDebugEnabled()) {
+ logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
+ }
+
+ // write bloom filter
+ size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkGroupMetaDataList);
+ if (logger.isDebugEnabled()) {
+ logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
+ }
+
+ // write TsFileMetaData size
+ ReadWriteIOUtils.write(size, out.wrapAsStream());// write the size of the file metadata.
+
+ // write magic string
+ out.write(magicStringBytes);
+
+ // close file
+ out.close();
+ canWrite = false;
+ logger.info("output stream is closed");
+ }
+
+ /**
+ * 1. group chunkGroupMetaDataList to TsDeviceMetadata 2. flush TsDeviceMetadata 3. get
+ * TsDeviceMetadataIndex
+ *
+ * @param chunkGroupMetaDataList all chunk group metadata in memory
+ * @return TsDeviceMetadataIndex in TsFileMetaData
+ */
+ private long[] flushAllChunkMetadataList(
+ List<ChunkGroupMetaData> chunkGroupMetaDataList) throws IOException {
+
+
+ // convert ChunkMetadataList to this field
+ Map<Path, List<ChunkMetaData>> timeseriesMetadataList= new TreeMap<>();
+
+ // flush timeseriesMetadataList one by one
+
+
+ // return long[]
+
+ return null;
+ }
+
+ /**
+ * group all chunk group metadata by device.
+ *
+ * @param chunkMetaDataList all chunk metadata
+ * @return ChunkMetadata list of each timeseries
+ */
+ private TreeMap<Path, List<ChunkMetaData>> getAllTsMetadata(
+ List<ChunkMetaData> chunkMetaDataList) {
+ TreeMap<Path, List<ChunkMetaData>> timeseriesMetadataList = new TreeMap<>();
+
+ for (ChunkMetaData chunkMetaData: chunkMetaDataList) {
+
+ }
+ return timeseriesMetadataList;
+ }
+
+ /**
+ * get the length of normal OutputStream.
+ *
+ * @return - length of normal OutputStream
+ * @throws IOException if I/O error occurs
+ */
+ public long getPos() throws IOException {
+ return out.getPosition();
+ }
+
+
+ /**
+ * get chunkGroupMetaDataList.
+ *
+ * @return - List of chunkGroupMetaData
+ */
+ public List<ChunkGroupMetaData> getChunkGroupMetaDatas() {
+ return chunkGroupMetaDataList;
+ }
+
+ public boolean canWrite() {
+ return canWrite;
+ }
+
+ public void mark() throws IOException {
+ markedPosition = getPos();
+ }
+
+ public void reset() throws IOException {
+ out.truncate(markedPosition);
+ }
+
+ /**
+ * close the outputStream or file channel without writing FileMetadata. This is just used for
+ * Testing.
+ */
+ public void close() throws IOException {
+ canWrite = false;
+ out.close();
+ }
+
+ void writeSeparatorMaskForTest() throws IOException {
+ out.write(new byte[]{MetaMarker.SEPARATOR});
+ }
+
+ void writeChunkMaskForTest() throws IOException {
+ out.write(new byte[]{MetaMarker.CHUNK_HEADER});
+ }
+
+
+ public int getTotalChunkNum() {
+ return totalChunkNum;
+ }
+
+ public int getInvalidChunkNum() {
+ return invalidChunkNum;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ /**
+ * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
+ */
+ public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
+ Map<Path, Integer> startTimeIdxes = new HashMap<>();
+ chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
+
+ Iterator<ChunkGroupMetaData> chunkGroupMetaDataIterator = chunkGroupMetaDataList.iterator();
+ while (chunkGroupMetaDataIterator.hasNext()) {
+ ChunkGroupMetaData chunkGroupMetaData = chunkGroupMetaDataIterator.next();
+ String deviceId = chunkGroupMetaData.getDeviceID();
+ int chunkNum = chunkGroupMetaData.getChunkMetaDataList().size();
+ Iterator<ChunkMetaData> chunkMetaDataIterator =
+ chunkGroupMetaData.getChunkMetaDataList().iterator();
+ while (chunkMetaDataIterator.hasNext()) {
+ ChunkMetaData chunkMetaData = chunkMetaDataIterator.next();
+ Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
+ int startTimeIdx = startTimeIdxes.get(path);
+
+ List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
+ boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
+ && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
+ if (!chunkValid) {
+ chunkMetaDataIterator.remove();
+ chunkNum--;
+ invalidChunkNum++;
+ } else {
+ startTimeIdxes.put(path, startTimeIdx + 1);
+ }
+ }
+ if (chunkNum == 0) {
+ chunkGroupMetaDataIterator.remove();
+ }
+ }
+ }
+
+ /**
+ * this function is only for Test.
+ *
+ * @return TsFileOutput
+ */
+ public TsFileOutput getIOWriterOut() {
+ return this.out;
+ }
+}