You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 09:39:22 UTC
[iotdb] 02/04: merge Vector
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56d92eb416866d86d8217d5a2f0de1b425b8237f
Merge: eb42aed 9c72690
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 14:05:44 2021 +0800
merge Vector
.../iotdb/cluster/client/DataClientProvider.java | 4 +-
.../cluster/client/async/AsyncClientPool.java | 34 ++-
.../iotdb/cluster/client/sync/SyncClientPool.java | 59 +++--
.../iotdb/cluster/client/sync/SyncDataClient.java | 9 +-
.../iotdb/cluster/client/sync/SyncMetaClient.java | 10 +-
.../exception/BadSeedUrlFormatException.java | 3 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 78 ++----
.../apache/iotdb/cluster/metadata/MetaPuller.java | 14 +-
.../iotdb/cluster/query/ClusterPlanExecutor.java | 37 +--
.../cluster/query/aggregate/ClusterAggregator.java | 14 +-
.../cluster/query/fill/ClusterPreviousFill.java | 14 +-
.../query/groupby/RemoteGroupByExecutor.java | 27 +-
.../query/last/ClusterLastQueryExecutor.java | 14 +-
.../cluster/query/reader/ClusterReaderFactory.java | 14 +-
.../iotdb/cluster/query/reader/DataSourceInfo.java | 12 +-
.../apache/iotdb/cluster/server/ClientServer.java | 11 +-
.../cluster/server/PullSnapshotHintService.java | 9 +-
.../cluster/server/service/BaseSyncService.java | 15 +-
.../cluster/server/service/DataSyncService.java | 7 +-
.../cluster/server/service/MetaSyncService.java | 5 +-
.../cluster/client/DataClientProviderTest.java | 136 ++++++++++
.../cluster/client/sync/SyncClientPoolTest.java | 17 +-
.../cluster/client/sync/SyncDataClientTest.java | 51 ++++
.../cluster/client/sync/SyncMetaClientTest.java | 47 ++++
server/src/assembly/resources/conf/iotdb-env.bat | 5 +
server/src/assembly/resources/conf/iotdb-env.sh | 3 +
.../iotdb/db/engine/flush/MemTableFlushTask.java | 12 +-
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 12 +-
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 27 +-
.../iotdb/tsfile/file/header/ChunkHeader.java | 15 +-
.../file/metadata/statistics/Statistics.java | 94 ++-----
.../file/metadata/statistics/TimeStatistics.java | 161 ++++++++++++
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 4 +
.../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 18 +-
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 24 +-
.../iotdb/tsfile/write/chunk/IChunkWriter.java | 15 +-
.../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 255 ++++++++++++++++++
.../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 252 ++++++++++++++++++
.../tsfile/write/chunk/VectorChunkWriterImpl.java | 202 ++++++++++++++
.../apache/iotdb/tsfile/write/page/PageWriter.java | 1 +
.../iotdb/tsfile/write/page/TimePageWriter.java | 177 +++++++++++++
.../page/{PageWriter.java => ValuePageWriter.java} | 160 ++++++-----
.../write/record/datapoint/BooleanDataPoint.java | 2 +-
.../write/record/datapoint/DoubleDataPoint.java | 2 +-
.../write/record/datapoint/FloatDataPoint.java | 2 +-
.../write/record/datapoint/IntDataPoint.java | 2 +-
.../write/record/datapoint/LongDataPoint.java | 2 +-
.../write/record/datapoint/StringDataPoint.java | 2 +-
.../tsfile/write/schema/IMeasurementSchema.java | 34 ++-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 22 +-
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 3 +-
.../write/writer/RestorableTsFileIOWriterTest.java | 5 +-
.../tsfile/write/writer/TestTsFileOutput.java | 70 +++++
.../tsfile/write/writer/TimeChunkWriterTest.java | 111 ++++++++
.../tsfile/write/writer/TimePageWriterTest.java | 171 ++++++++++++
.../tsfile/write/writer/ValueChunkWriterTest.java | 109 ++++++++
.../tsfile/write/writer/ValuePageWriterTest.java | 291 +++++++++++++++++++++
.../write/writer/VectorChunkWriterImplTest.java | 178 +++++++++++++
.../write/writer/VectorMeasurementSchemaStub.java | 80 ++++++
59 files changed, 2722 insertions(+), 442 deletions(-)
diff --cc server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index dcd82ab,caa0893..c1bc5a9
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@@ -173,29 -173,23 +173,29 @@@ public class MemTableFlushTask
switch (dataType) {
case BOOLEAN:
- seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+ seriesWriterImpl.write(time, tvPairs.getBoolean(i), false);
break;
case INT32:
- seriesWriterImpl.write(time, tvPairs.getInt(i));
+ seriesWriterImpl.write(time, tvPairs.getInt(i), false);
break;
case INT64:
- seriesWriterImpl.write(time, tvPairs.getLong(i));
+ seriesWriterImpl.write(time, tvPairs.getLong(i), false);
break;
case FLOAT:
- seriesWriterImpl.write(time, tvPairs.getFloat(i));
+ seriesWriterImpl.write(time, tvPairs.getFloat(i), false);
break;
case DOUBLE:
- seriesWriterImpl.write(time, tvPairs.getDouble(i));
+ seriesWriterImpl.write(time, tvPairs.getDouble(i), false);
break;
case TEXT:
- seriesWriterImpl.write(time, tvPairs.getBinary(i));
+ seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
break;
+ case VECTOR:
+ // TODO:
+// for ( : tvPairs.getVector(i)) {
+// seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
+// }
+ break;
default:
LOGGER.error(
"Storage group {} does not support data type: {}", storageGroup, dataType);
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 1ec2001,1c1ba7f..759a751
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@@ -77,6 -77,8 +77,8 @@@ public abstract class Statistics<T>
return new DoubleStatistics();
case FLOAT:
return new FloatStatistics();
- case Vector:
++ case VECTOR:
+ return new TimeStatistics();
default:
throw new UnknownColumnTypeException(type.toString());
}
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
index 0000000,74bd701..e812166
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
@@@ -1,0 -1,161 +1,161 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.tsfile.file.metadata.statistics;
+
+ import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.ByteBuffer;
+
+ public class TimeStatistics extends Statistics {
+
+ static final int TIME_STATISTICS_FIXED_RAM_SIZE = 40;
+
+ @Override
+ public TSDataType getType() {
- return TSDataType.Vector;
++ return TSDataType.VECTOR;
+ }
+
+ @Override
+ public int getStatsSize() {
+ return 0;
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ throw new StatisticsClassException("Time statistics does not support: set min max from bytes");
+ }
+
+ @Override
+ public Long getMinValue() {
+ throw new StatisticsClassException("Time statistics does not support: min value");
+ }
+
+ @Override
+ public Long getMaxValue() {
+ throw new StatisticsClassException("Time statistics does not support: max value");
+ }
+
+ @Override
+ public Long getFirstValue() {
+ throw new StatisticsClassException("Time statistics does not support: first value");
+ }
+
+ @Override
+ public Long getLastValue() {
+ throw new StatisticsClassException("Time statistics does not support: last value");
+ }
+
+ @Override
+ public double getSumDoubleValue() {
+ throw new StatisticsClassException("Time statistics does not support: double sum");
+ }
+
+ @Override
+ public long getSumLongValue() {
+ throw new StatisticsClassException("Time statistics does not support: long sum");
+ }
+
+ @Override
+ void updateStats(long value) {
+ throw new StatisticsClassException("Time statistics does not support: update stats");
+ }
+
+ @Override
+ void updateStats(long[] values, int batchSize) {
+ throw new StatisticsClassException("Time statistics does not support: update stats");
+ }
+
+ @Override
+ public void updateStats(long minValue, long maxValue) {
+ throw new StatisticsClassException("Time statistics does not support: update stats");
+ }
+
+ @Override
+ public long calculateRamSize() {
+ return TIME_STATISTICS_FIXED_RAM_SIZE;
+ }
+
+ @Override
+ protected void mergeStatisticsValue(Statistics stats) {}
+
+ @Override
+ public byte[] getMinValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+ }
+
+ @Override
+ public byte[] getMaxValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get max value bytes");
+ }
+
+ @Override
+ public byte[] getFirstValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get first value bytes");
+ }
+
+ @Override
+ public byte[] getLastValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get last value bytes");
+ }
+
+ @Override
+ public byte[] getSumValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get sum value bytes");
+ }
+
+ @Override
+ public ByteBuffer getMinValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+ }
+
+ @Override
+ public ByteBuffer getMaxValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get max value buffer");
+ }
+
+ @Override
+ public ByteBuffer getFirstValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get first value buffer");
+ }
+
+ @Override
+ public ByteBuffer getLastValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get last value buffer");
+ }
+
+ @Override
+ public ByteBuffer getSumValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get sum value buffer");
+ }
+
+ @Override
+ public int serializeStats(OutputStream outputStream) {
+ return 0;
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IOException {}
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {}
+ }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index 0000000,df9ded9..522eff5
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@@ -1,0 -1,255 +1,255 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.tsfile.write.chunk;
+
+ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+ import org.apache.iotdb.tsfile.compress.ICompressor;
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+ import org.apache.iotdb.tsfile.file.header.PageHeader;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.io.IOException;
+
+ public class TimeChunkWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class);
+
+ private final String measurementId;
+
+ private final TSEncoding encodingType;
+
+ private final CompressionType compressionType;
+
+ /** all pages of this chunk. */
+ private final PublicBAOS pageBuffer;
+
+ private int numOfPages;
+
+ /** write data into current page */
+ private TimePageWriter pageWriter;
+
+ /** page size threshold. */
+ private final long pageSizeThreshold;
+
+ private final int maxNumberOfPointsInPage;
+
+ /** value count in current page. */
+ private int valueCountInOnePageForNextCheck;
+
+ // initial value for valueCountInOnePageForNextCheck
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+
+ /** statistic of this chunk. */
+ private TimeStatistics statistics;
+
+ /** first page info */
+ private int sizeWithoutStatistic;
+
+ private Statistics<?> firstPageStatistics;
+
+ public TimeChunkWriter(
+ String measurementId,
+ CompressionType compressionType,
+ TSEncoding encodingType,
+ Encoder timeEncoder) {
+ this.measurementId = measurementId;
+ this.encodingType = encodingType;
+ this.compressionType = compressionType;
+ this.pageBuffer = new PublicBAOS();
+
+ this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxNumberOfPointsInPage =
+ TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ // initial check of memory usage. So that we have enough data to make an initial prediction
+ this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+ // init statistics for this chunk and page
+ this.statistics = new TimeStatistics();
+
+ this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
+ }
+
+ public void write(long time) {
+ pageWriter.write(time);
+ }
+
+ public void write(long[] timestamps, int batchSize) {
+ pageWriter.write(timestamps, batchSize);
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+ * to pageBuffer
+ */
+ public boolean checkPageSizeAndMayOpenANewPage() {
+ if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
+ logger.debug("current line count reaches the upper bound, write page {}", measurementId);
+ return true;
+ } else if (pageWriter.getPointNumber()
+ >= valueCountInOnePageForNextCheck) { // need to check memory size
+ // not checking the memory used for every value
+ long currentPageSize = pageWriter.estimateMaxMemSize();
+ if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
+ // we will write the current page
+ logger.debug(
+ "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+ measurementId,
+ pageSizeThreshold,
+ currentPageSize,
+ pageWriter.getPointNumber());
+ valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ return true;
+ } else {
+ // reset the valueCountInOnePageForNextCheck for the next page
+ valueCountInOnePageForNextCheck =
+ (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
+ }
+ }
+ return false;
+ }
+
+ public void writePageToPageBuffer() {
+ try {
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = pageWriter.getStatistics();
+ this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+ firstPageStatistics = null;
+ } else {
+ pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+ }
+
+ // update statistics of this chunk
+ numOfPages++;
+ this.statistics.mergeStatistics(pageWriter.getStatistics());
+ } catch (IOException e) {
+ logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+ } finally {
+ // clear start time stamp for next initializing
+ pageWriter.reset();
+ }
+ }
+
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+ sealCurrentPage();
+ writeAllPagesOfChunkToTsFile(tsfileWriter);
+
+ // reinit this chunk writer
+ pageBuffer.reset();
+ numOfPages = 0;
+ firstPageStatistics = null;
+ this.statistics = new TimeStatistics();
+ }
+
+ public long estimateMaxSeriesMemSize() {
+ return pageBuffer.size()
+ + pageWriter.estimateMaxMemSize()
+ + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+ + pageWriter.getStatistics().getSerializedSize();
+ }
+
+ public long getCurrentChunkSize() {
+ if (pageBuffer.size() == 0) {
+ return 0;
+ }
+ // return the serialized size of the chunk header + all pages
+ return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+ + (long) pageBuffer.size();
+ }
+
+ public void sealCurrentPage() {
+ if (pageWriter != null && pageWriter.getPointNumber() > 0) {
+ writePageToPageBuffer();
+ }
+ }
+
+ public void clearPageWriter() {
+ pageWriter = null;
+ }
+
+ public int getNumOfPages() {
+ return numOfPages;
+ }
+
+ public TSDataType getDataType() {
- return TSDataType.Vector;
++ return TSDataType.VECTOR;
+ }
+
+ /**
+ * write the page to specified IOWriter.
+ *
+ * @param writer the specified IOWriter
+ * @throws IOException exception in IO
+ */
+ public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
+ if (statistics.getCount() == 0) {
+ return;
+ }
+
+ // start to write this column chunk
+ writer.startFlushChunk(
+ measurementId,
+ compressionType,
- TSDataType.Vector,
++ TSDataType.VECTOR,
+ encodingType,
+ statistics,
+ pageBuffer.size(),
+ numOfPages,
+ 0x80);
+
+ long dataOffset = writer.getPos();
+
+ // write all pages of this column
+ writer.writeBytesToStream(pageBuffer);
+
+ int dataSize = (int) (writer.getPos() - dataOffset);
+ if (dataSize != pageBuffer.size()) {
+ throw new IOException(
+ "Bytes written is inconsistent with the size of data: "
+ + dataSize
+ + " !="
+ + " "
+ + pageBuffer.size());
+ }
+
+ writer.endCurrentChunk();
+ }
+
+ /** only used for test */
+ public PublicBAOS getPageBuffer() {
+ return pageBuffer;
+ }
+ }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
index 0000000,af71ecd..8f1e907
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
@@@ -1,0 -1,202 +1,202 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.tsfile.write.chunk;
+
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.Binary;
+ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+
+ public class VectorChunkWriterImpl implements IChunkWriter {
+
+ private final TimeChunkWriter timeChunkWriter;
+ private final List<ValueChunkWriter> valueChunkWriterList;
+ private int valueIndex;
+
+ /** @param schema schema of this measurement */
+ public VectorChunkWriterImpl(IMeasurementSchema schema) {
+ timeChunkWriter =
+ new TimeChunkWriter(
+ schema.getMeasurementId(),
+ schema.getCompressor(),
+ schema.getTimeTSEncoding(),
+ schema.getTimeEncoder());
+
+ List<String> valueMeasurementIdList = schema.getValueMeasurementIdList();
+ List<TSDataType> valueTSDataTypeList = schema.getValueTSDataTypeList();
+ List<TSEncoding> valueTSEncodingList = schema.getValueTSEncodingList();
+ List<Encoder> valueEncoderList = schema.getValueEncoderList();
+
+ valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
+ for (int i = 0; i < valueMeasurementIdList.size(); i++) {
+ valueChunkWriterList.add(
+ new ValueChunkWriter(
+ valueMeasurementIdList.get(i),
+ schema.getCompressor(),
+ valueTSDataTypeList.get(i),
+ valueTSEncodingList.get(i),
+ valueEncoderList.get(i)));
+ }
+
+ this.valueIndex = 0;
+ }
+
+ @Override
+ public void write(long time, int value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, long value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, boolean value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, float value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, double value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, Binary value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time) {
+ valueIndex = 0;
+ timeChunkWriter.write(time);
+ if (checkPageSizeAndMayOpenANewPage()) {
+ writePageToPageBuffer();
+ }
+ }
+
+ // TODO tsfile write interface
+ @Override
+ public void write(long[] timestamps, int[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, long[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, boolean[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, float[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, double[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, Binary[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+ * to pageBuffer
+ */
+ private boolean checkPageSizeAndMayOpenANewPage() {
+ return timeChunkWriter.checkPageSizeAndMayOpenANewPage();
+ }
+
+ private void writePageToPageBuffer() {
+ timeChunkWriter.writePageToPageBuffer();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+
+ @Override
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+ timeChunkWriter.writeToFileWriter(tsfileWriter);
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.writeToFileWriter(tsfileWriter);
+ }
+ }
+
+ @Override
+ public long estimateMaxSeriesMemSize() {
+ long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize();
+ }
+ return estimateMaxSeriesMemSize;
+ }
+
+ @Override
+ public long getCurrentChunkSize() {
+ long currentChunkSize = timeChunkWriter.getCurrentChunkSize();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ currentChunkSize += valueChunkWriter.getCurrentChunkSize();
+ }
+ return currentChunkSize;
+ }
+
+ @Override
+ public void sealCurrentPage() {
+ timeChunkWriter.sealCurrentPage();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.sealCurrentPage();
+ }
+ }
+
+ @Override
+ public void clearPageWriter() {
+ timeChunkWriter.clearPageWriter();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.clearPageWriter();
+ }
+ }
+
+ @Override
+ public int getNumOfPages() {
+ return timeChunkWriter.getNumOfPages();
+ }
+
+ @Override
+ public TSDataType getDataType() {
- return TSDataType.Vector;
++ return TSDataType.VECTOR;
+ }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
index 0000000,9968815..bdca8d5
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
@@@ -1,0 -1,111 +1,111 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.tsfile.write.writer;
+
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.MetaMarker;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+
+ import org.junit.Test;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.fail;
+
+ public class TimeChunkWriterTest {
+
+ @Test
+ public void testWrite1() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ TimeChunkWriter chunkWriter =
+ new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+ for (long time = 1; time <= 10; time++) {
+ chunkWriter.write(time);
+ }
+ assertFalse(chunkWriter.checkPageSizeAndMayOpenANewPage());
+ chunkWriter.sealCurrentPage();
+ // page without statistics size: 82 + chunk header size: 8
+ assertEquals(90L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ assertEquals(
+ (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(82, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
- assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(82, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testWrite2() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ TimeChunkWriter chunkWriter =
+ new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+ for (long time = 1; time <= 10; time++) {
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+ for (long time = 11; time <= 20; time++) {
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+ assertEquals(2, chunkWriter.getNumOfPages());
+ // two pages with statistics size: (82 + 17) * 2 + chunk header size: 9
+ assertEquals(207L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(198, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
- assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(198, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
index 0000000,2ec6294..cab975c
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
@@@ -1,0 -1,171 +1,171 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.tsfile.write.writer;
+
+ import org.apache.iotdb.tsfile.compress.ICompressor;
+ import org.apache.iotdb.tsfile.compress.IUnCompressor;
+ import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+
+ import org.junit.Test;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+
+ public class TimePageWriterTest {
+
+ @Test
+ public void testWrite() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ try {
+ pageWriter.write(1L);
+ assertEquals(8, pageWriter.estimateMaxMemSize());
+ ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+ pageWriter.reset();
+ assertEquals(0, pageWriter.estimateMaxMemSize());
+ byte[] timeBytes = new byte[8];
+ buffer.get(timeBytes);
+ ByteBuffer buffer2 = ByteBuffer.wrap(timeBytes);
+ PlainDecoder decoder = new PlainDecoder();
+ assertEquals(1L, decoder.readLong(buffer2));
+ decoder.reset();
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ pageWriter.write(1L);
+ pageWriter.write(2L);
+ pageWriter.write(3L);
+ // without page statistics
+ assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+ // total size
+ assertEquals(26, publicBAOS.size());
+ TimeStatistics statistics = pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(3L, statistics.getEndTime());
+ assertEquals(3, statistics.getCount());
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // uncompressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ // compressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ pageWriter.write(1L);
+ pageWriter.write(2L);
+ pageWriter.write(3L);
+ // with page statistics
+ assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false));
+ // total size
+ assertEquals(43, publicBAOS.size());
+ TimeStatistics statistics = pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(3L, statistics.getEndTime());
+ assertEquals(3, statistics.getCount());
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // uncompressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ // compressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ TimeStatistics testStatistics =
- (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.Vector);
++ (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.VECTOR);
+ assertEquals(1L, testStatistics.getStartTime());
+ assertEquals(3L, testStatistics.getEndTime());
+ assertEquals(3, testStatistics.getCount());
+ assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ pageWriter.write(1L);
+ pageWriter.write(2L);
+ pageWriter.write(3L);
+ // without page statistics
+ assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+
+ // total size
+ assertEquals(22, publicBAOS.size());
+ TimeStatistics statistics = pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(3L, statistics.getEndTime());
+ assertEquals(3, statistics.getCount());
+ ByteBuffer compressedBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // uncompressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+ // compressedSize
+ assertEquals(20, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+ byte[] compress = new byte[20];
+ compressedBuffer.get(compress);
+ byte[] uncompress = new byte[24];
+ IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+ unCompressor.uncompress(compress, 0, 20, uncompress, 0);
+ ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress);
+ assertEquals(1L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+ assertEquals(2L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+ assertEquals(3L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+ } catch (IOException e) {
+ fail();
+ }
+ }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
index 0000000,93e18bb..3ca81b1
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
@@@ -1,0 -1,178 +1,178 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.tsfile.write.writer;
+
+ import org.apache.iotdb.tsfile.file.MetaMarker;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
+
+ import org.junit.Test;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+
+ public class VectorChunkWriterImplTest {
+
+ @Test
+ public void testWrite1() {
+ VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+ VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+
+ chunkWriter.sealCurrentPage();
+ // time chunk: 14 + 4 + 160; value chunk 1: 8 + 2 + 4 + 3 + 80; value chunk 2: 8 + 2 + 4 + 3 +
+ // 20; value chunk 3: 9 + 4 + 7 + 20 * 8;
+ assertEquals(492L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeToFileWriter(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // time chunk
+ assertEquals(
+ (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(164, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
- assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 164);
+
+ // value chunk 1
+ assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(89, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 89);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(29, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 29);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(171, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(171, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testWrite2() {
+ VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+ VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+ for (int time = 21; time <= 40; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+
+ // time chunk: 14 + (4 + 17 + 160) * 2
+ // value chunk 1: 9 + (2 + 41 + 4 + 3 + 80) * 2
+ // value chunk 2: 9 + (2 + 41 + 4 + 3 + 20) * 2
+ // value chunk 3: 9 + (4 + 57 + 4 + 3 + 160) * 2
+ assertEquals(1259L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeToFileWriter(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // time chunk
+ assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(362, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
- assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++ assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 362);
+
+ // value chunk 1
+ assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(260, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 260);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(140, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 140);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(456, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(456, buffer.remaining());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
index 0000000,795a0a6..40335f5
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
@@@ -1,0 -1,80 +1,80 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.tsfile.write.writer;
+
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+ import java.util.Arrays;
+ import java.util.List;
+
+ public class VectorMeasurementSchemaStub implements IMeasurementSchema {
+
+ @Override
+ public String getMeasurementId() {
+ return "s1.time";
+ }
+
+ @Override
+ public CompressionType getCompressor() {
+ return CompressionType.UNCOMPRESSED;
+ }
+
+ @Override
+ public TSDataType getType() {
- return TSDataType.Vector;
++ return TSDataType.VECTOR;
+ }
+
+ @Override
+ public TSEncoding getTimeTSEncoding() {
+ return TSEncoding.PLAIN;
+ }
+
+ @Override
+ public Encoder getTimeEncoder() {
+ return new PlainEncoder(TSDataType.INT64, 0);
+ }
+
+ @Override
+ public List<String> getValueMeasurementIdList() {
+ return Arrays.asList("s1", "s2", "s3");
+ }
+
+ @Override
+ public List<TSDataType> getValueTSDataTypeList() {
+ return Arrays.asList(TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE);
+ }
+
+ @Override
+ public List<TSEncoding> getValueTSEncodingList() {
+ return Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN);
+ }
+
+ @Override
+ public List<Encoder> getValueEncoderList() {
+ return Arrays.asList(
+ new PlainEncoder(TSDataType.FLOAT, 0),
+ new PlainEncoder(TSDataType.INT32, 0),
+ new PlainEncoder(TSDataType.DOUBLE, 0));
+ }
+ }