You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 01:10:35 UTC
[iotdb] branch Vector updated: [To Vector] Support vector write in
tsfile (#2799)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch Vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/Vector by this push:
new 74cd5fb [To Vector] Support vector write in tsfile (#2799)
74cd5fb is described below
commit 74cd5fb95f8eedec1838d89b3595580e4ece9b84
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Mar 11 09:10:14 2021 +0800
[To Vector] Support vector write in tsfile (#2799)
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 12 +-
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 12 +-
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 27 +-
.../iotdb/tsfile/file/header/ChunkHeader.java | 15 +-
.../tsfile/file/metadata/enums/TSDataType.java | 5 +-
.../file/metadata/statistics/Statistics.java | 94 ++-----
.../file/metadata/statistics/TimeStatistics.java | 161 ++++++++++++
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 4 +
.../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 60 +++++
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 24 +-
.../iotdb/tsfile/write/chunk/IChunkWriter.java | 15 +-
.../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 255 ++++++++++++++++++
.../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 252 ++++++++++++++++++
.../tsfile/write/chunk/VectorChunkWriterImpl.java | 202 ++++++++++++++
.../apache/iotdb/tsfile/write/page/PageWriter.java | 1 +
.../iotdb/tsfile/write/page/TimePageWriter.java | 177 +++++++++++++
.../page/{PageWriter.java => ValuePageWriter.java} | 160 ++++++-----
.../write/record/datapoint/BooleanDataPoint.java | 2 +-
.../write/record/datapoint/DoubleDataPoint.java | 2 +-
.../write/record/datapoint/FloatDataPoint.java | 2 +-
.../write/record/datapoint/IntDataPoint.java | 2 +-
.../write/record/datapoint/LongDataPoint.java | 2 +-
.../write/record/datapoint/StringDataPoint.java | 2 +-
.../tsfile/write/schema/IMeasurementSchema.java | 47 ++++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 22 +-
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 3 +-
.../write/writer/RestorableTsFileIOWriterTest.java | 5 +-
.../tsfile/write/writer/TestTsFileOutput.java | 52 ++++
.../tsfile/write/writer/TimeChunkWriterTest.java | 111 ++++++++
.../tsfile/write/writer/TimePageWriterTest.java | 171 ++++++++++++
.../tsfile/write/writer/ValueChunkWriterTest.java | 109 ++++++++
.../tsfile/write/writer/ValuePageWriterTest.java | 291 +++++++++++++++++++++
.../write/writer/VectorChunkWriterImplTest.java | 178 +++++++++++++
.../write/writer/VectorMeasurementSchemaStub.java | 80 ++++++
34 files changed, 2355 insertions(+), 202 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..caa0893 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -173,22 +173,22 @@ public class MemTableFlushTask {
switch (dataType) {
case BOOLEAN:
- seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+ seriesWriterImpl.write(time, tvPairs.getBoolean(i), false);
break;
case INT32:
- seriesWriterImpl.write(time, tvPairs.getInt(i));
+ seriesWriterImpl.write(time, tvPairs.getInt(i), false);
break;
case INT64:
- seriesWriterImpl.write(time, tvPairs.getLong(i));
+ seriesWriterImpl.write(time, tvPairs.getLong(i), false);
break;
case FLOAT:
- seriesWriterImpl.write(time, tvPairs.getFloat(i));
+ seriesWriterImpl.write(time, tvPairs.getFloat(i), false);
break;
case DOUBLE:
- seriesWriterImpl.write(time, tvPairs.getDouble(i));
+ seriesWriterImpl.write(time, tvPairs.getDouble(i), false);
break;
case TEXT:
- seriesWriterImpl.write(time, tvPairs.getBinary(i));
+ seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
break;
default:
LOGGER.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index b7e0842..b6e2316 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -412,22 +412,22 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
switch (schema.getType()) {
case INT32:
- chunkWriter.write(time, (int) value);
+ chunkWriter.write(time, (int) value, false);
break;
case INT64:
- chunkWriter.write(time, (long) value);
+ chunkWriter.write(time, (long) value, false);
break;
case FLOAT:
- chunkWriter.write(time, (float) value);
+ chunkWriter.write(time, (float) value, false);
break;
case DOUBLE:
- chunkWriter.write(time, (double) value);
+ chunkWriter.write(time, (double) value, false);
break;
case BOOLEAN:
- chunkWriter.write(time, (boolean) value);
+ chunkWriter.write(time, (boolean) value, false);
break;
case TEXT:
- chunkWriter.write(time, (Binary) value);
+ chunkWriter.write(time, (Binary) value, false);
break;
default:
throw new UnSupportedDataTypeException(
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index 973cb24..2dbaca3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -53,22 +53,25 @@ public class MergeUtils {
public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) {
switch (chunkWriter.getDataType()) {
case TEXT:
- chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+ chunkWriter.write(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary(), false);
break;
case DOUBLE:
- chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
+ chunkWriter.write(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble(), false);
break;
case BOOLEAN:
- chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+ chunkWriter.write(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean(), false);
break;
case INT64:
- chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong());
+ chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong(), false);
break;
case INT32:
- chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt());
+ chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt(), false);
break;
case FLOAT:
- chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat());
+ chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat(), false);
break;
default:
throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType());
@@ -109,22 +112,22 @@ public class MergeUtils {
public static void writeBatchPoint(BatchData batchData, int i, IChunkWriter chunkWriter) {
switch (chunkWriter.getDataType()) {
case TEXT:
- chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBinaryByIndex(i));
+ chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBinaryByIndex(i), false);
break;
case DOUBLE:
- chunkWriter.write(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i));
+ chunkWriter.write(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), false);
break;
case BOOLEAN:
- chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBooleanByIndex(i));
+ chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBooleanByIndex(i), false);
break;
case INT64:
- chunkWriter.write(batchData.getTimeByIndex(i), batchData.getLongByIndex(i));
+ chunkWriter.write(batchData.getTimeByIndex(i), batchData.getLongByIndex(i), false);
break;
case INT32:
- chunkWriter.write(batchData.getTimeByIndex(i), batchData.getIntByIndex(i));
+ chunkWriter.write(batchData.getTimeByIndex(i), batchData.getIntByIndex(i), false);
break;
case FLOAT:
- chunkWriter.write(batchData.getTimeByIndex(i), batchData.getFloatByIndex(i));
+ chunkWriter.write(batchData.getTimeByIndex(i), batchData.getFloatByIndex(i), false);
break;
default:
throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 663da60..48c9134 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -58,8 +58,21 @@ public class ChunkHeader {
CompressionType compressionType,
TSEncoding encoding,
int numOfPages) {
+ this(measurementID, dataSize, dataType, compressionType, encoding, numOfPages, 0);
+ }
+
+ public ChunkHeader(
+ String measurementID,
+ int dataSize,
+ TSDataType dataType,
+ CompressionType compressionType,
+ TSEncoding encoding,
+ int numOfPages,
+ int mask) {
this(
- numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER,
+ (byte)
+ ((numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER)
+ | mask),
measurementID,
dataSize,
getSerializedSize(measurementID, dataSize),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 0edda45..a2b7eef 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -41,7 +41,10 @@ public enum TSDataType {
DOUBLE((byte) 4),
/** TEXT */
- TEXT((byte) 5);
+ TEXT((byte) 5),
+
+ /** Vector */
+ Vector((byte) 6);
private final byte type;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 1ec2001..1c1ba7f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -77,6 +77,8 @@ public abstract class Statistics<T> {
return new DoubleStatistics();
case FLOAT:
return new FloatStatistics();
+ case Vector:
+ return new TimeStatistics();
default:
throw new UnknownColumnTypeException(type.toString());
}
@@ -189,61 +191,36 @@ public abstract class Statistics<T> {
}
public void update(long time, boolean value) {
- if (time < this.startTime) {
- startTime = time;
- }
- if (time > this.endTime) {
- endTime = time;
- }
- count++;
+ update(time);
updateStats(value);
}
public void update(long time, int value) {
- if (time < this.startTime) {
- startTime = time;
- }
- if (time > this.endTime) {
- endTime = time;
- }
- count++;
+ update(time);
updateStats(value);
}
public void update(long time, long value) {
- if (time < this.startTime) {
- startTime = time;
- }
- if (time > this.endTime) {
- endTime = time;
- }
- count++;
+ update(time);
updateStats(value);
}
public void update(long time, float value) {
- if (time < this.startTime) {
- startTime = time;
- }
- if (time > this.endTime) {
- endTime = time;
- }
- count++;
+ update(time);
updateStats(value);
}
public void update(long time, double value) {
- if (time < this.startTime) {
- startTime = time;
- }
- if (time > this.endTime) {
- endTime = time;
- }
- count++;
+ update(time);
updateStats(value);
}
public void update(long time, Binary value) {
+ update(time);
+ updateStats(value);
+ }
+
+ public void update(long time) {
if (time < startTime) {
startTime = time;
}
@@ -251,65 +228,39 @@ public abstract class Statistics<T> {
endTime = time;
}
count++;
- updateStats(value);
}
public void update(long[] time, boolean[] values, int batchSize) {
- if (time[0] < startTime) {
- startTime = time[0];
- }
- if (time[batchSize - 1] > this.endTime) {
- endTime = time[batchSize - 1];
- }
- count += batchSize;
+ update(time, batchSize);
updateStats(values, batchSize);
}
public void update(long[] time, int[] values, int batchSize) {
- if (time[0] < startTime) {
- startTime = time[0];
- }
- if (time[batchSize - 1] > this.endTime) {
- endTime = time[batchSize - 1];
- }
- count += batchSize;
+ update(time, batchSize);
updateStats(values, batchSize);
}
public void update(long[] time, long[] values, int batchSize) {
- if (time[0] < startTime) {
- startTime = time[0];
- }
- if (time[batchSize - 1] > this.endTime) {
- endTime = time[batchSize - 1];
- }
- count += batchSize;
+ update(time, batchSize);
updateStats(values, batchSize);
}
public void update(long[] time, float[] values, int batchSize) {
- if (time[0] < startTime) {
- startTime = time[0];
- }
- if (time[batchSize - 1] > this.endTime) {
- endTime = time[batchSize - 1];
- }
- count += batchSize;
+ update(time, batchSize);
updateStats(values, batchSize);
}
public void update(long[] time, double[] values, int batchSize) {
- if (time[0] < startTime) {
- startTime = time[0];
- }
- if (time[batchSize - 1] > this.endTime) {
- endTime = time[batchSize - 1];
- }
- count += batchSize;
+ update(time, batchSize);
updateStats(values, batchSize);
}
public void update(long[] time, Binary[] values, int batchSize) {
+ update(time, batchSize);
+ updateStats(values, batchSize);
+ }
+
+ public void update(long[] time, int batchSize) {
if (time[0] < startTime) {
startTime = time[0];
}
@@ -317,7 +268,6 @@ public abstract class Statistics<T> {
endTime = time[batchSize - 1];
}
count += batchSize;
- updateStats(values, batchSize);
}
protected abstract void mergeStatisticsValue(Statistics stats);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
new file mode 100644
index 0000000..74bd701
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata.statistics;
+
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class TimeStatistics extends Statistics {
+
+ static final int TIME_STATISTICS_FIXED_RAM_SIZE = 40;
+
+ @Override
+ public TSDataType getType() {
+ return TSDataType.Vector;
+ }
+
+ @Override
+ public int getStatsSize() {
+ return 0;
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ throw new StatisticsClassException("Time statistics does not support: set min max from bytes");
+ }
+
+ @Override
+ public Long getMinValue() {
+ throw new StatisticsClassException("Time statistics does not support: min value");
+ }
+
+ @Override
+ public Long getMaxValue() {
+ throw new StatisticsClassException("Time statistics does not support: max value");
+ }
+
+ @Override
+ public Long getFirstValue() {
+ throw new StatisticsClassException("Time statistics does not support: first value");
+ }
+
+ @Override
+ public Long getLastValue() {
+ throw new StatisticsClassException("Time statistics does not support: last value");
+ }
+
+ @Override
+ public double getSumDoubleValue() {
+ throw new StatisticsClassException("Time statistics does not support: double sum");
+ }
+
+ @Override
+ public long getSumLongValue() {
+ throw new StatisticsClassException("Time statistics does not support: long sum");
+ }
+
+ @Override
+ void updateStats(long value) {
+ throw new StatisticsClassException("Time statistics does not support: update stats");
+ }
+
+ @Override
+ void updateStats(long[] values, int batchSize) {
+ throw new StatisticsClassException("Time statistics does not support: update stats");
+ }
+
+ @Override
+ public void updateStats(long minValue, long maxValue) {
+ throw new StatisticsClassException("Time statistics does not support: update stats");
+ }
+
+ @Override
+ public long calculateRamSize() {
+ return TIME_STATISTICS_FIXED_RAM_SIZE;
+ }
+
+ @Override
+ protected void mergeStatisticsValue(Statistics stats) {}
+
+ @Override
+ public byte[] getMinValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+ }
+
+ @Override
+ public byte[] getMaxValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get max value bytes");
+ }
+
+ @Override
+ public byte[] getFirstValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get first value bytes");
+ }
+
+ @Override
+ public byte[] getLastValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get last value bytes");
+ }
+
+ @Override
+ public byte[] getSumValueBytes() {
+ throw new StatisticsClassException("Time statistics does not support: get sum value bytes");
+ }
+
+ @Override
+ public ByteBuffer getMinValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+ }
+
+ @Override
+ public ByteBuffer getMaxValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get max value buffer");
+ }
+
+ @Override
+ public ByteBuffer getFirstValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get first value buffer");
+ }
+
+ @Override
+ public ByteBuffer getLastValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get last value buffer");
+ }
+
+ @Override
+ public ByteBuffer getSumValueBuffer() {
+ throw new StatisticsClassException("Time statistics does not support: get sum value buffer");
+ }
+
+ @Override
+ public int serializeStats(OutputStream outputStream) {
+ return 0;
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IOException {}
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {}
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 881db03..61efe8e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -86,4 +86,8 @@ public class PublicBAOS extends ByteArrayOutputStream {
public int size() {
return count;
}
+
+ public void truncate(int size) {
+ count = size;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 73b01d2..6c00f43 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -45,6 +45,8 @@ public abstract class TsPrimitiveType implements Serializable {
return new TsPrimitiveType.TsDouble((double) v);
case TEXT:
return new TsPrimitiveType.TsBinary((Binary) v);
+ case Vector:
+ return new TsPrimitiveType.TsVector((TsPrimitiveType[]) v);
default:
throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
}
@@ -74,6 +76,10 @@ public abstract class TsPrimitiveType implements Serializable {
throw new UnsupportedOperationException("getBinary() is not supported for current sub-class");
}
+ public TsPrimitiveType[] getVector() {
+ throw new UnsupportedOperationException("setDouble() is not supported for current sub-class");
+ }
+
public void setBoolean(boolean val) {
throw new UnsupportedOperationException("setBoolean() is not supported for current sub-class");
}
@@ -98,6 +104,10 @@ public abstract class TsPrimitiveType implements Serializable {
throw new UnsupportedOperationException("setBinary() is not supported for current sub-class");
}
+ public void setVector(TsPrimitiveType[] val) {
+ throw new UnsupportedOperationException("setDouble() is not supported for current sub-class");
+ }
+
/**
* get the size of one instance of current class.
*
@@ -462,4 +472,54 @@ public abstract class TsPrimitiveType implements Serializable {
return false;
}
}
+
+ public static class TsVector extends TsPrimitiveType {
+
+ private TsPrimitiveType[] value;
+
+ public TsVector(TsPrimitiveType[] value) {
+ this.value = value;
+ }
+
+ @Override
+ public TsPrimitiveType[] getVector() {
+ return value;
+ }
+
+ @Override
+ public void setVector(TsPrimitiveType[] val) {
+ this.value = val;
+ }
+
+ @Override
+ public int getSize() {
+ int size = 0;
+ for (TsPrimitiveType type : value) {
+ size += type.getSize();
+ }
+ // object header + array object header
+ return 4 + 4 + size;
+ }
+
+ @Override
+ public Object getValue() {
+ return getVector();
+ }
+
+ @Override
+ public String getStringValue() {
+ StringBuilder builder = new StringBuilder("[");
+ builder.append(value[0].getStringValue());
+ for (TsPrimitiveType type : value) {
+ builder.append(", ").append(type.getStringValue());
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return TSDataType.Vector;
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 2c00f4a..2beda20 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -145,7 +145,7 @@ public class ChunkWriterImpl implements IChunkWriter {
}
@Override
- public void write(long time, long value) {
+ public void write(long time, long value, boolean isNull) {
// store last point for sdtEncoding, it still needs to go through encoding process
// in case it exceeds compdev and needs to store second last point
if (!isSdtEncoding || sdtEncoder.encodeLong(time, value)) {
@@ -160,7 +160,7 @@ public class ChunkWriterImpl implements IChunkWriter {
}
@Override
- public void write(long time, int value) {
+ public void write(long time, int value, boolean isNull) {
if (!isSdtEncoding || sdtEncoder.encodeInt(time, value)) {
pageWriter.write(
isSdtEncoding ? sdtEncoder.getTime() : time,
@@ -173,13 +173,13 @@ public class ChunkWriterImpl implements IChunkWriter {
}
@Override
- public void write(long time, boolean value) {
+ public void write(long time, boolean value, boolean isNull) {
pageWriter.write(time, value);
checkPageSizeAndMayOpenANewPage();
}
@Override
- public void write(long time, float value) {
+ public void write(long time, float value, boolean isNull) {
if (!isSdtEncoding || sdtEncoder.encodeFloat(time, value)) {
pageWriter.write(
isSdtEncoding ? sdtEncoder.getTime() : time,
@@ -193,7 +193,7 @@ public class ChunkWriterImpl implements IChunkWriter {
}
@Override
- public void write(long time, double value) {
+ public void write(long time, double value, boolean isNull) {
if (!isSdtEncoding || sdtEncoder.encodeDouble(time, value)) {
pageWriter.write(
isSdtEncoding ? sdtEncoder.getTime() : time,
@@ -206,12 +206,17 @@ public class ChunkWriterImpl implements IChunkWriter {
}
@Override
- public void write(long time, Binary value) {
+ public void write(long time, Binary value, boolean isNull) {
pageWriter.write(time, value);
checkPageSizeAndMayOpenANewPage();
}
@Override
+ public void write(long time) {
+ throw new IllegalStateException("write time method is not implemented in common chunk writer");
+ }
+
+ @Override
public void write(long[] timestamps, int[] values, int batchSize) {
if (isSdtEncoding) {
batchSize = sdtEncoder.encode(timestamps, values, batchSize);
@@ -412,7 +417,7 @@ public class ChunkWriterImpl implements IChunkWriter {
* @param statistics the chunk statistics
* @throws IOException exception in IO
*/
- public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer, Statistics<?> statistics)
+ private void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer, Statistics<?> statistics)
throws IOException {
if (statistics.getCount() == 0) {
return;
@@ -420,13 +425,14 @@ public class ChunkWriterImpl implements IChunkWriter {
// start to write this column chunk
writer.startFlushChunk(
- measurementSchema,
+ measurementSchema.getMeasurementId(),
compressor.getType(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
statistics,
pageBuffer.size(),
- numOfPages);
+ numOfPages,
+ 0);
long dataOffset = writer.getPos();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
index cab9615..963ef64 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
@@ -28,22 +28,25 @@ import java.io.IOException;
public interface IChunkWriter {
/** write a time value pair. */
- void write(long time, int value);
+ void write(long time, int value, boolean isNull);
/** write a time value pair. */
- void write(long time, long value);
+ void write(long time, long value, boolean isNull);
/** write a time value pair. */
- void write(long time, boolean value);
+ void write(long time, boolean value, boolean isNull);
/** write a time value pair. */
- void write(long time, float value);
+ void write(long time, float value, boolean isNull);
/** write a time value pair. */
- void write(long time, double value);
+ void write(long time, double value, boolean isNull);
/** write a time value pair. */
- void write(long time, Binary value);
+ void write(long time, Binary value, boolean isNull);
+
+ /** write a time. */
+ void write(long time);
/** write time series */
void write(long[] timestamps, int[] values, int batchSize);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
new file mode 100644
index 0000000..df9ded9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TimeChunkWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class);
+
+ private final String measurementId;
+
+ private final TSEncoding encodingType;
+
+ private final CompressionType compressionType;
+
+ /** all pages of this chunk. */
+ private final PublicBAOS pageBuffer;
+
+ private int numOfPages;
+
+ /** write data into current page */
+ private TimePageWriter pageWriter;
+
+ /** page size threshold. */
+ private final long pageSizeThreshold;
+
+ private final int maxNumberOfPointsInPage;
+
+ /** value count in current page. */
+ private int valueCountInOnePageForNextCheck;
+
+ // initial value for valueCountInOnePageForNextCheck
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+
+ /** statistic of this chunk. */
+ private TimeStatistics statistics;
+
+ /** first page info */
+ private int sizeWithoutStatistic;
+
+ private Statistics<?> firstPageStatistics;
+
+ public TimeChunkWriter(
+ String measurementId,
+ CompressionType compressionType,
+ TSEncoding encodingType,
+ Encoder timeEncoder) {
+ this.measurementId = measurementId;
+ this.encodingType = encodingType;
+ this.compressionType = compressionType;
+ this.pageBuffer = new PublicBAOS();
+
+ this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxNumberOfPointsInPage =
+ TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ // initial check of memory usage. So that we have enough data to make an initial prediction
+ this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+ // init statistics for this chunk and page
+ this.statistics = new TimeStatistics();
+
+ this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
+ }
+
+ public void write(long time) {
+ pageWriter.write(time);
+ }
+
+ public void write(long[] timestamps, int batchSize) {
+ pageWriter.write(timestamps, batchSize);
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+ * to pageBuffer
+ */
+ public boolean checkPageSizeAndMayOpenANewPage() {
+ if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
+ logger.debug("current line count reaches the upper bound, write page {}", measurementId);
+ return true;
+ } else if (pageWriter.getPointNumber()
+ >= valueCountInOnePageForNextCheck) { // need to check memory size
+ // not checking the memory used for every value
+ long currentPageSize = pageWriter.estimateMaxMemSize();
+ if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
+ // we will write the current page
+ logger.debug(
+ "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+ measurementId,
+ pageSizeThreshold,
+ currentPageSize,
+ pageWriter.getPointNumber());
+ valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ return true;
+ } else {
+ // reset the valueCountInOnePageForNextCheck for the next page
+ valueCountInOnePageForNextCheck =
+ (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
+ }
+ }
+ return false;
+ }
+
+ public void writePageToPageBuffer() {
+ try {
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = pageWriter.getStatistics();
+ this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+ firstPageStatistics = null;
+ } else {
+ pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+ }
+
+ // update statistics of this chunk
+ numOfPages++;
+ this.statistics.mergeStatistics(pageWriter.getStatistics());
+ } catch (IOException e) {
+ logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+ } finally {
+ // clear start time stamp for next initializing
+ pageWriter.reset();
+ }
+ }
+
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+ sealCurrentPage();
+ writeAllPagesOfChunkToTsFile(tsfileWriter);
+
+ // reinit this chunk writer
+ pageBuffer.reset();
+ numOfPages = 0;
+ firstPageStatistics = null;
+ this.statistics = new TimeStatistics();
+ }
+
+ public long estimateMaxSeriesMemSize() {
+ return pageBuffer.size()
+ + pageWriter.estimateMaxMemSize()
+ + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+ + pageWriter.getStatistics().getSerializedSize();
+ }
+
+ public long getCurrentChunkSize() {
+ if (pageBuffer.size() == 0) {
+ return 0;
+ }
+ // return the serialized size of the chunk header + all pages
+ return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+ + (long) pageBuffer.size();
+ }
+
+ public void sealCurrentPage() {
+ if (pageWriter != null && pageWriter.getPointNumber() > 0) {
+ writePageToPageBuffer();
+ }
+ }
+
+ public void clearPageWriter() {
+ pageWriter = null;
+ }
+
+ public int getNumOfPages() {
+ return numOfPages;
+ }
+
+ public TSDataType getDataType() {
+ return TSDataType.Vector;
+ }
+
+ /**
+ * write the page to specified IOWriter.
+ *
+ * @param writer the specified IOWriter
+ * @throws IOException exception in IO
+ */
+ public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
+ if (statistics.getCount() == 0) {
+ return;
+ }
+
+ // start to write this column chunk
+ writer.startFlushChunk(
+ measurementId,
+ compressionType,
+ TSDataType.Vector,
+ encodingType,
+ statistics,
+ pageBuffer.size(),
+ numOfPages,
+ 0x80);
+
+ long dataOffset = writer.getPos();
+
+ // write all pages of this column
+ writer.writeBytesToStream(pageBuffer);
+
+ int dataSize = (int) (writer.getPos() - dataOffset);
+ if (dataSize != pageBuffer.size()) {
+ throw new IOException(
+ "Bytes written is inconsistent with the size of data: "
+ + dataSize
+ + " !="
+ + " "
+ + pageBuffer.size());
+ }
+
+ writer.endCurrentChunk();
+ }
+
+ /** only used for test */
+ public PublicBAOS getPageBuffer() {
+ return pageBuffer;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
new file mode 100644
index 0000000..17e137d
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ValueChunkWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(ValueChunkWriter.class);
+
+ private final String measurementId;
+
+ private final TSEncoding encodingType;
+
+ private final TSDataType dataType;
+
+ private final CompressionType compressionType;
+
+ /** all pages of this chunk. */
+ private final PublicBAOS pageBuffer;
+
+ private int numOfPages;
+
+ /** write data into current page */
+ private ValuePageWriter pageWriter;
+
+ /** statistic of this chunk. */
+ private Statistics<?> statistics;
+
+ /** first page info */
+ private int sizeWithoutStatistic;
+
+ private Statistics<?> firstPageStatistics;
+
+ public ValueChunkWriter(
+ String measurementId,
+ CompressionType compressionType,
+ TSDataType dataType,
+ TSEncoding encodingType,
+ Encoder valueEncoder) {
+ this.measurementId = measurementId;
+ this.encodingType = encodingType;
+ this.dataType = dataType;
+ this.compressionType = compressionType;
+ this.pageBuffer = new PublicBAOS();
+
+ // init statistics for this chunk and page
+ this.statistics = Statistics.getStatsByType(dataType);
+
+ this.pageWriter =
+ new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType);
+ }
+
+ public void write(long time, long value, boolean isNull) {
+ pageWriter.write(time, value, isNull);
+ }
+
+ public void write(long time, int value, boolean isNull) {
+ pageWriter.write(time, value, isNull);
+ }
+
+ public void write(long time, boolean value, boolean isNull) {
+ pageWriter.write(time, value, isNull);
+ }
+
+ public void write(long time, float value, boolean isNull) {
+ pageWriter.write(time, value, isNull);
+ }
+
+ public void write(long time, double value, boolean isNull) {
+ pageWriter.write(time, value, isNull);
+ }
+
+ public void write(long time, Binary value, boolean isNull) {
+ pageWriter.write(time, value, isNull);
+ }
+
+ public void write(long[] timestamps, int[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ }
+
+ public void write(long[] timestamps, long[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ }
+
+ public void write(long[] timestamps, boolean[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ }
+
+ public void write(long[] timestamps, float[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ }
+
+ public void write(long[] timestamps, double[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ }
+
+ public void write(long[] timestamps, Binary[] values, int batchSize) {
+ pageWriter.write(timestamps, values, batchSize);
+ }
+
+ public void writePageToPageBuffer() {
+ try {
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = pageWriter.getStatistics();
+ this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+ firstPageStatistics = null;
+ } else {
+ pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+ }
+
+ // update statistics of this chunk
+ numOfPages++;
+ this.statistics.mergeStatistics(pageWriter.getStatistics());
+ } catch (IOException e) {
+ logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+ } finally {
+ // clear start time stamp for next initializing
+ pageWriter.reset(dataType);
+ }
+ }
+
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+ sealCurrentPage();
+ writeAllPagesOfChunkToTsFile(tsfileWriter);
+
+ // reinit this chunk writer
+ pageBuffer.reset();
+ numOfPages = 0;
+ firstPageStatistics = null;
+ this.statistics = Statistics.getStatsByType(dataType);
+ }
+
+ public long estimateMaxSeriesMemSize() {
+ return pageBuffer.size()
+ + pageWriter.estimateMaxMemSize()
+ + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+ + pageWriter.getStatistics().getSerializedSize();
+ }
+
+ public long getCurrentChunkSize() {
+ if (pageBuffer.size() == 0) {
+ return 0;
+ }
+ // return the serialized size of the chunk header + all pages
+ return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+ + (long) pageBuffer.size();
+ }
+
+ public void sealCurrentPage() {
+ // if the page contains no points, we still need to serialize it
+ if (pageWriter != null && pageWriter.getSize() != 0) {
+ writePageToPageBuffer();
+ }
+ }
+
+ public void clearPageWriter() {
+ pageWriter = null;
+ }
+
+ public int getNumOfPages() {
+ return numOfPages;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * write the page to specified IOWriter.
+ *
+ * @param writer the specified IOWriter
+ * @throws IOException exception in IO
+ */
+ public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
+ if (statistics.getCount() == 0) {
+ return;
+ }
+
+ // start to write this column chunk
+ writer.startFlushChunk(
+ measurementId,
+ compressionType,
+ dataType,
+ encodingType,
+ statistics,
+ pageBuffer.size(),
+ numOfPages,
+ 0x40);
+
+ long dataOffset = writer.getPos();
+
+ // write all pages of this column
+ writer.writeBytesToStream(pageBuffer);
+
+ int dataSize = (int) (writer.getPos() - dataOffset);
+ if (dataSize != pageBuffer.size()) {
+ throw new IOException(
+ "Bytes written is inconsistent with the size of data: "
+ + dataSize
+ + " !="
+ + " "
+ + pageBuffer.size());
+ }
+
+ writer.endCurrentChunk();
+ }
+
+ /** only used for test */
+ public PublicBAOS getPageBuffer() {
+ return pageBuffer;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
new file mode 100644
index 0000000..af71ecd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class VectorChunkWriterImpl implements IChunkWriter {
+
+ private final TimeChunkWriter timeChunkWriter;
+ private final List<ValueChunkWriter> valueChunkWriterList;
+ private int valueIndex;
+
+ /** @param schema schema of this measurement */
+ public VectorChunkWriterImpl(IMeasurementSchema schema) {
+ timeChunkWriter =
+ new TimeChunkWriter(
+ schema.getMeasurementId(),
+ schema.getCompressor(),
+ schema.getTimeTSEncoding(),
+ schema.getTimeEncoder());
+
+ List<String> valueMeasurementIdList = schema.getValueMeasurementIdList();
+ List<TSDataType> valueTSDataTypeList = schema.getValueTSDataTypeList();
+ List<TSEncoding> valueTSEncodingList = schema.getValueTSEncodingList();
+ List<Encoder> valueEncoderList = schema.getValueEncoderList();
+
+ valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
+ for (int i = 0; i < valueMeasurementIdList.size(); i++) {
+ valueChunkWriterList.add(
+ new ValueChunkWriter(
+ valueMeasurementIdList.get(i),
+ schema.getCompressor(),
+ valueTSDataTypeList.get(i),
+ valueTSEncodingList.get(i),
+ valueEncoderList.get(i)));
+ }
+
+ this.valueIndex = 0;
+ }
+
+ @Override
+ public void write(long time, int value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, long value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, boolean value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, float value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, double value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time, Binary value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+ }
+
+ @Override
+ public void write(long time) {
+ valueIndex = 0;
+ timeChunkWriter.write(time);
+ if (checkPageSizeAndMayOpenANewPage()) {
+ writePageToPageBuffer();
+ }
+ }
+
+ // TODO tsfile write interface
+ @Override
+ public void write(long[] timestamps, int[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, long[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, boolean[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, float[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, double[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(long[] timestamps, Binary[] values, int batchSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+ * to pageBuffer
+ */
+ private boolean checkPageSizeAndMayOpenANewPage() {
+ return timeChunkWriter.checkPageSizeAndMayOpenANewPage();
+ }
+
+ private void writePageToPageBuffer() {
+ timeChunkWriter.writePageToPageBuffer();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+
+ @Override
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+ timeChunkWriter.writeToFileWriter(tsfileWriter);
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.writeToFileWriter(tsfileWriter);
+ }
+ }
+
+ @Override
+ public long estimateMaxSeriesMemSize() {
+ long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize();
+ }
+ return estimateMaxSeriesMemSize;
+ }
+
+ @Override
+ public long getCurrentChunkSize() {
+ long currentChunkSize = timeChunkWriter.getCurrentChunkSize();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ currentChunkSize += valueChunkWriter.getCurrentChunkSize();
+ }
+ return currentChunkSize;
+ }
+
+ @Override
+ public void sealCurrentPage() {
+ timeChunkWriter.sealCurrentPage();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.sealCurrentPage();
+ }
+ }
+
+ @Override
+ public void clearPageWriter() {
+ timeChunkWriter.clearPageWriter();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.clearPageWriter();
+ }
+ }
+
+ @Override
+ public int getNumOfPages() {
+ return timeChunkWriter.getNumOfPages();
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return TSDataType.Vector;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index 8467d15..369b809 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -265,6 +265,7 @@ public class PageWriter {
/** reset this page */
public void reset(MeasurementSchema measurementSchema) {
timeOut.reset();
+ timeEncoder = measurementSchema.getTimeEncoder();
valueOut.reset();
statistics = Statistics.getStatsByType(measurementSchema.getType());
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
new file mode 100644
index 0000000..5223d18
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.page;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This writer is used to write time into a page. It consists of a time encoder and respective
+ * OutputStream.
+ */
+public class TimePageWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(TimePageWriter.class);
+
+ private final ICompressor compressor;
+
+ // time
+ private Encoder timeEncoder;
+ private final PublicBAOS timeOut;
+
+ /**
+ * statistic of current page. It will be reset after calling {@code
+ * writePageHeaderAndDataIntoBuff()}
+ */
+ private TimeStatistics statistics;
+
+ public TimePageWriter(Encoder timeEncoder, ICompressor compressor) {
+ this.timeOut = new PublicBAOS();
+ this.timeEncoder = timeEncoder;
+ this.statistics = new TimeStatistics();
+ this.compressor = compressor;
+ }
+
+ /** write a time into encoder */
+ public void write(long time) {
+ timeEncoder.encode(time, timeOut);
+ statistics.update(time);
+ }
+
+ /** write time series into encoder */
+ public void write(long[] timestamps, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
+ timeEncoder.encode(timestamps[i], timeOut);
+ }
+ statistics.update(timestamps, batchSize);
+ }
+
+ /** flush all data remained in encoders. */
+ private void prepareEndWriteOnePage() throws IOException {
+ timeEncoder.flush(timeOut);
+ }
+
+ /**
+ * getUncompressedBytes return data what it has been written in form of <code>
+ * size of time list, time list, value list</code>
+ *
+ * @return a new readable ByteBuffer whose position is 0.
+ */
+ public ByteBuffer getUncompressedBytes() throws IOException {
+ prepareEndWriteOnePage();
+ ByteBuffer buffer = ByteBuffer.allocate(timeOut.size());
+ buffer.put(timeOut.getBuf(), 0, timeOut.size());
+ buffer.flip();
+ return buffer;
+ }
+
+ /** write the page header and data into the PageWriter's output stream. */
+ public int writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer, boolean first)
+ throws IOException {
+ if (statistics.getCount() == 0) {
+ return 0;
+ }
+
+ ByteBuffer pageData = getUncompressedBytes();
+ int uncompressedSize = pageData.remaining();
+ int compressedSize;
+ byte[] compressedBytes = null;
+
+ if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+ compressedSize = uncompressedSize;
+ } else {
+ compressedBytes = new byte[compressor.getMaxBytesForCompression(uncompressedSize)];
+ // data is never a directByteBuffer now, so we can use data.array()
+ compressedSize =
+ compressor.compress(
+ pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
+ }
+
+ // write the page header to IOWriter
+ int sizeWithoutStatistic = 0;
+ if (first) {
+ sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer);
+ sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer);
+ } else {
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer);
+ statistics.serialize(pageBuffer);
+ }
+
+ // write page content to temp PBAOS
+ logger.trace(
+ "start to flush a time page data into buffer, buffer position {} ", pageBuffer.size());
+ if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+ channel.write(pageData);
+ }
+ } else {
+ pageBuffer.write(compressedBytes, 0, compressedSize);
+ }
+ logger.trace(
+ "finish flushing a time page data into buffer, buffer position {} ", pageBuffer.size());
+ return sizeWithoutStatistic;
+ }
+
+ /**
+ * calculate max possible memory size it occupies, including time outputStream and value
+ * outputStream, because size outputStream is never used until flushing.
+ *
+ * @return allocated size in time, value and outputStream
+ */
+ public long estimateMaxMemSize() {
+ return timeOut.size() + timeEncoder.getMaxByteSize();
+ }
+
+ /** reset this page */
+ public void reset() {
+ timeOut.reset();
+ statistics = new TimeStatistics();
+ }
+
+ public void setTimeEncoder(Encoder encoder) {
+ this.timeEncoder = encoder;
+ }
+
+ public void initStatistics() {
+ statistics = new TimeStatistics();
+ }
+
+ public long getPointNumber() {
+ return statistics.getCount();
+ }
+
+ public TimeStatistics getStatistics() {
+ return statistics;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
similarity index 69%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
index 8467d15..0fbfdbd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,21 +36,17 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
/**
- * This writer is used to write time-value into a page. It consists of a time encoder, a value
- * encoder and respective OutputStream.
+ * This writer is used to write value into a page. It consists of a value encoder and respective
+ * OutputStream.
*/
-public class PageWriter {
+public class ValuePageWriter {
+ private static final Logger logger = LoggerFactory.getLogger(ValuePageWriter.class);
- private static final Logger logger = LoggerFactory.getLogger(PageWriter.class);
+ private final ICompressor compressor;
- private ICompressor compressor;
-
- // time
- private Encoder timeEncoder;
- private PublicBAOS timeOut;
// value
private Encoder valueEncoder;
- private PublicBAOS valueOut;
+ private final PublicBAOS valueOut;
/**
* statistic of current page. It will be reset after calling {@code
@@ -59,76 +54,101 @@ public class PageWriter {
*/
private Statistics<?> statistics;
- public PageWriter() {
- this(null, null);
- }
+ private byte bitmap;
- public PageWriter(MeasurementSchema measurementSchema) {
- this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder());
- this.statistics = Statistics.getStatsByType(measurementSchema.getType());
- this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor());
- }
+ private int size;
- private PageWriter(Encoder timeEncoder, Encoder valueEncoder) {
- this.timeOut = new PublicBAOS();
+ private final PublicBAOS bitmapOut;
+
+ private static final int MASK = 1 << 7;
+
+ public ValuePageWriter(Encoder valueEncoder, ICompressor compressor, TSDataType dataType) {
this.valueOut = new PublicBAOS();
- this.timeEncoder = timeEncoder;
+ this.bitmap = 0;
+ this.size = 0;
+ this.bitmapOut = new PublicBAOS();
this.valueEncoder = valueEncoder;
+ this.statistics = Statistics.getStatsByType(dataType);
+ this.compressor = compressor;
}
/** write a time value pair into encoder */
- public void write(long time, boolean value) {
- timeEncoder.encode(time, timeOut);
- valueEncoder.encode(value, valueOut);
- statistics.update(time, value);
+ public void write(long time, boolean value, boolean isNull) {
+ setBit(isNull);
+ if (!isNull) {
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
}
/** write a time value pair into encoder */
- public void write(long time, short value) {
- timeEncoder.encode(time, timeOut);
- valueEncoder.encode(value, valueOut);
- statistics.update(time, value);
+ public void write(long time, short value, boolean isNull) {
+ setBit(isNull);
+ if (!isNull) {
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
}
/** write a time value pair into encoder */
- public void write(long time, int value) {
- timeEncoder.encode(time, timeOut);
- valueEncoder.encode(value, valueOut);
- statistics.update(time, value);
+ public void write(long time, int value, boolean isNull) {
+ setBit(isNull);
+ if (!isNull) {
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
}
/** write a time value pair into encoder */
- public void write(long time, long value) {
- timeEncoder.encode(time, timeOut);
- valueEncoder.encode(value, valueOut);
- statistics.update(time, value);
+ public void write(long time, long value, boolean isNull) {
+ setBit(isNull);
+ if (!isNull) {
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
}
/** write a time value pair into encoder */
- public void write(long time, float value) {
- timeEncoder.encode(time, timeOut);
- valueEncoder.encode(value, valueOut);
- statistics.update(time, value);
+ public void write(long time, float value, boolean isNull) {
+ setBit(isNull);
+ if (!isNull) {
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
}
/** write a time value pair into encoder */
- public void write(long time, double value) {
- timeEncoder.encode(time, timeOut);
- valueEncoder.encode(value, valueOut);
- statistics.update(time, value);
+ public void write(long time, double value, boolean isNull) {
+ setBit(isNull);
+ if (!isNull) {
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
}
/** write a time value pair into encoder */
- public void write(long time, Binary value) {
- timeEncoder.encode(time, timeOut);
- valueEncoder.encode(value, valueOut);
- statistics.update(time, value);
+ public void write(long time, Binary value, boolean isNull) {
+ setBit(isNull);
+ if (!isNull) {
+ valueEncoder.encode(value, valueOut);
+ statistics.update(time, value);
+ }
+ }
+
+ private void setBit(boolean isNull) {
+ if (!isNull) {
+ bitmap |= (MASK >>> (size % 8));
+ }
+ size++;
+ if (size % 8 == 0) {
+ bitmapOut.write(bitmap);
+ bitmap = 0;
+ }
}
/** write time series into encoder */
public void write(long[] timestamps, boolean[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
- timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
statistics.update(timestamps, values, batchSize);
@@ -137,7 +157,6 @@ public class PageWriter {
/** write time series into encoder */
public void write(long[] timestamps, int[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
- timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
statistics.update(timestamps, values, batchSize);
@@ -146,7 +165,6 @@ public class PageWriter {
/** write time series into encoder */
public void write(long[] timestamps, long[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
- timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
statistics.update(timestamps, values, batchSize);
@@ -155,7 +173,6 @@ public class PageWriter {
/** write time series into encoder */
public void write(long[] timestamps, float[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
- timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
statistics.update(timestamps, values, batchSize);
@@ -164,7 +181,6 @@ public class PageWriter {
/** write time series into encoder */
public void write(long[] timestamps, double[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
- timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
statistics.update(timestamps, values, batchSize);
@@ -173,7 +189,6 @@ public class PageWriter {
/** write time series into encoder */
public void write(long[] timestamps, Binary[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
- timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
statistics.update(timestamps, values, batchSize);
@@ -181,8 +196,10 @@ public class PageWriter {
/** flush all data remained in encoders. */
private void prepareEndWriteOnePage() throws IOException {
- timeEncoder.flush(timeOut);
valueEncoder.flush(valueOut);
+ if (size % 8 != 0) {
+ bitmapOut.write(bitmap);
+ }
}
/**
@@ -193,9 +210,9 @@ public class PageWriter {
*/
public ByteBuffer getUncompressedBytes() throws IOException {
prepareEndWriteOnePage();
- ByteBuffer buffer = ByteBuffer.allocate(timeOut.size() + valueOut.size() + 4);
- ReadWriteForEncodingUtils.writeUnsignedVarInt(timeOut.size(), buffer);
- buffer.put(timeOut.getBuf(), 0, timeOut.size());
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + bitmapOut.size() + valueOut.size());
+ buffer.putInt(size);
+ buffer.put(bitmapOut.getBuf(), 0, bitmapOut.size());
buffer.put(valueOut.getBuf(), 0, valueOut.size());
buffer.flip();
return buffer;
@@ -204,7 +221,7 @@ public class PageWriter {
/** write the page header and data into the PageWriter's output stream. */
public int writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer, boolean first)
throws IOException {
- if (statistics.getCount() == 0) {
+ if (size == 0) {
return 0;
}
@@ -256,21 +273,16 @@ public class PageWriter {
* @return allocated size in time, value and outputStream
*/
public long estimateMaxMemSize() {
- return timeOut.size()
- + valueOut.size()
- + timeEncoder.getMaxByteSize()
- + valueEncoder.getMaxByteSize();
+ return Integer.BYTES + bitmapOut.size() + 1 + valueOut.size() + valueEncoder.getMaxByteSize();
}
/** reset this page */
- public void reset(MeasurementSchema measurementSchema) {
- timeOut.reset();
+ public void reset(TSDataType dataType) {
+ bitmapOut.reset();
+ size = 0;
+ bitmap = 0;
valueOut.reset();
- statistics = Statistics.getStatsByType(measurementSchema.getType());
- }
-
- public void setTimeEncoder(Encoder encoder) {
- this.timeEncoder = encoder;
+ statistics = Statistics.getStatsByType(dataType);
}
public void setValueEncoder(Encoder encoder) {
@@ -288,4 +300,8 @@ public class PageWriter {
public Statistics<?> getStatistics() {
return statistics;
}
+
+ public int getSize() {
+ return size;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
index b607a79..e8b4bee 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
@@ -47,7 +47,7 @@ public class BooleanDataPoint extends DataPoint {
LOG.warn("given IChunkWriter is null, do nothing and return");
return;
}
- writer.write(time, value);
+ writer.write(time, value, false);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
index b988ed7..853313e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
@@ -47,7 +47,7 @@ public class DoubleDataPoint extends DataPoint {
LOG.warn("given IChunkWriter is null, do nothing and return");
return;
}
- writer.write(time, value);
+ writer.write(time, value, false);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
index 3d13f17..863be98 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
@@ -47,7 +47,7 @@ public class FloatDataPoint extends DataPoint {
LOG.warn("given IChunkWriter is null, do nothing and return");
return;
}
- writer.write(time, value);
+ writer.write(time, value, false);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
index a66378e..02e0d5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
@@ -47,7 +47,7 @@ public class IntDataPoint extends DataPoint {
LOG.warn("given IChunkWriter is null, do nothing and return");
return;
}
- writer.write(time, value);
+ writer.write(time, value, false);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
index 4bc2e9f..8dce510 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
@@ -47,7 +47,7 @@ public class LongDataPoint extends DataPoint {
LOG.warn("given IChunkWriter is null, do nothing and return");
return;
}
- writer.write(time, value);
+ writer.write(time, value, false);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
index 3a3918c..cf371bc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
@@ -48,7 +48,7 @@ public class StringDataPoint extends DataPoint {
LOG.warn("given IChunkWriter is null, do nothing and return");
return;
}
- writer.write(time, value);
+ writer.write(time, value, false);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java
new file mode 100644
index 0000000..855e5fc
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.schema;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.List;
+
+public interface IMeasurementSchema {
+
+ String getMeasurementId();
+
+ CompressionType getCompressor();
+
+ TSDataType getType();
+
+ TSEncoding getTimeTSEncoding();
+
+ Encoder getTimeEncoder();
+
+ List<String> getValueMeasurementIdList();
+
+ List<TSDataType> getValueTSDataTypeList();
+
+ List<TSEncoding> getValueTSEncodingList();
+
+ List<Encoder> getValueEncoderList();
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 6dfeec8..c68c78b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,6 +119,11 @@ public class TsFileIOWriter {
startFile();
}
+ /** for test only */
+ public TsFileIOWriter(TsFileOutput output, boolean test) {
+ this.out = output;
+ }
+
/**
* Writes given bytes to output stream. This method is called when total memory size exceeds the
* chunk group size threshold.
@@ -163,35 +167,37 @@ public class TsFileIOWriter {
/**
* start a {@linkplain ChunkMetadata ChunkMetaData}.
*
- * @param measurementSchema - schema of this time series
+ * @param measurementId - measurementId of this time series
* @param compressionCodecName - compression name of this time series
* @param tsDataType - data type
* @param statistics - Chunk statistics
* @param dataSize - the serialized size of all pages
+ * @param mask - 0x80 for time chunk, 0x40 for value chunk, 0x00 for common chunk
* @throws IOException if I/O error occurs
*/
public void startFlushChunk(
- MeasurementSchema measurementSchema,
+ String measurementId,
CompressionType compressionCodecName,
TSDataType tsDataType,
TSEncoding encodingType,
Statistics<?> statistics,
int dataSize,
- int numOfPages)
+ int numOfPages,
+ int mask)
throws IOException {
currentChunkMetadata =
- new ChunkMetadata(
- measurementSchema.getMeasurementId(), tsDataType, out.getPosition(), statistics);
+ new ChunkMetadata(measurementId, tsDataType, out.getPosition(), statistics);
ChunkHeader header =
new ChunkHeader(
- measurementSchema.getMeasurementId(),
+ measurementId,
dataSize,
tsDataType,
compressionCodecName,
encodingType,
- numOfPages);
+ numOfPages,
+ mask);
header.serializeTo(out.wrapAsStream());
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 570ad8e..a0eb0e9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -62,12 +62,13 @@ public class TsFileIOWriterTest {
// chunk group 1
writer.startChunkGroup(deviceId);
writer.startFlushChunk(
- measurementSchema,
+ measurementSchema.getMeasurementId(),
measurementSchema.getCompressor(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
statistics,
0,
+ 0,
0);
writer.endCurrentChunk();
writer.endChunkGroup();
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 64db027..e16ee55 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -131,13 +131,14 @@ public class RestorableTsFileIOWriterTest {
writer
.getIOWriter()
.startFlushChunk(
- new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN),
+ new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN).getMeasurementId(),
CompressionType.SNAPPY,
TSDataType.FLOAT,
TSEncoding.PLAIN,
new FloatStatistics(),
100,
- 10);
+ 10,
+ 0);
writer.getIOWriter().close();
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
new file mode 100644
index 0000000..df46b6f
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
@@ -0,0 +1,52 @@
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class TestTsFileOutput implements TsFileOutput {
+
+ PublicBAOS publicBAOS = new PublicBAOS();
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ publicBAOS.write(b);
+ }
+
+ @Override
+ public void write(byte b) {
+ publicBAOS.write(b);
+ }
+
+ @Override
+ public void write(ByteBuffer b) {
+ publicBAOS.write(b.array(), b.position(), b.limit());
+ }
+
+ @Override
+ public long getPosition() {
+ return publicBAOS.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ publicBAOS.close();
+ }
+
+ @Override
+ public OutputStream wrapAsStream() {
+ return publicBAOS;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ publicBAOS.flush();
+ }
+
+ @Override
+ public void truncate(long size) {
+ publicBAOS.truncate((int) size);
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
new file mode 100644
index 0000000..9968815
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class TimeChunkWriterTest {
+
+ @Test
+ public void testWrite1() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ TimeChunkWriter chunkWriter =
+ new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+ for (long time = 1; time <= 10; time++) {
+ chunkWriter.write(time);
+ }
+ assertFalse(chunkWriter.checkPageSizeAndMayOpenANewPage());
+ chunkWriter.sealCurrentPage();
+ // page without statistics size: 82 + chunk header size: 8
+ assertEquals(90L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ assertEquals(
+ (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(82, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(82, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testWrite2() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ TimeChunkWriter chunkWriter =
+ new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+ for (long time = 1; time <= 10; time++) {
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+ for (long time = 11; time <= 20; time++) {
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+ assertEquals(2, chunkWriter.getNumOfPages());
+ // two pages with statistics size: (82 + 17) * 2 + chunk header size: 9
+ assertEquals(207L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(198, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(198, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
new file mode 100644
index 0000000..2ec6294
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TimePageWriterTest {
+
+ @Test
+ public void testWrite() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ try {
+ pageWriter.write(1L);
+ assertEquals(8, pageWriter.estimateMaxMemSize());
+ ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+ pageWriter.reset();
+ assertEquals(0, pageWriter.estimateMaxMemSize());
+ byte[] timeBytes = new byte[8];
+ buffer.get(timeBytes);
+ ByteBuffer buffer2 = ByteBuffer.wrap(timeBytes);
+ PlainDecoder decoder = new PlainDecoder();
+ assertEquals(1L, decoder.readLong(buffer2));
+ decoder.reset();
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ pageWriter.write(1L);
+ pageWriter.write(2L);
+ pageWriter.write(3L);
+ // without page statistics
+ assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+ // total size
+ assertEquals(26, publicBAOS.size());
+ TimeStatistics statistics = pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(3L, statistics.getEndTime());
+ assertEquals(3, statistics.getCount());
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // uncompressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ // compressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ pageWriter.write(1L);
+ pageWriter.write(2L);
+ pageWriter.write(3L);
+ // with page statistics
+ assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false));
+ // total size
+ assertEquals(43, publicBAOS.size());
+ TimeStatistics statistics = pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(3L, statistics.getEndTime());
+ assertEquals(3, statistics.getCount());
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // uncompressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ // compressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ TimeStatistics testStatistics =
+ (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.Vector);
+ assertEquals(1L, testStatistics.getStartTime());
+ assertEquals(3L, testStatistics.getEndTime());
+ assertEquals(3, testStatistics.getCount());
+ assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+ assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
+ Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ pageWriter.write(1L);
+ pageWriter.write(2L);
+ pageWriter.write(3L);
+ // without page statistics
+ assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+
+ // total size
+ assertEquals(22, publicBAOS.size());
+ TimeStatistics statistics = pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(3L, statistics.getEndTime());
+ assertEquals(3, statistics.getCount());
+ ByteBuffer compressedBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // uncompressedSize
+ assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+ // compressedSize
+ assertEquals(20, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+ byte[] compress = new byte[20];
+ compressedBuffer.get(compress);
+ byte[] uncompress = new byte[24];
+ IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+ unCompressor.uncompress(compress, 0, 20, uncompress, 0);
+ ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress);
+ assertEquals(1L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+ assertEquals(2L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+ assertEquals(3L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+ } catch (IOException e) {
+ fail();
+ }
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java
new file mode 100644
index 0000000..3cc8272
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ValueChunkWriterTest {
+
+ @Test
+ public void testWrite1() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ "s1", CompressionType.UNCOMPRESSED, TSDataType.FLOAT, TSEncoding.PLAIN, valueEncoder);
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, time % 4 == 0);
+ }
+ chunkWriter.sealCurrentPage();
+ // page without statistics size: 69 + chunk header size: 8
+ assertEquals(77L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(69, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(69, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testWrite2() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ "s1", CompressionType.UNCOMPRESSED, TSDataType.FLOAT, TSEncoding.PLAIN, valueEncoder);
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, time % 4 == 0);
+ }
+ chunkWriter.sealCurrentPage();
+ for (int time = 20; time <= 40; time++) {
+ chunkWriter.write(time, (float) time, time % 4 == 0);
+ }
+ chunkWriter.sealCurrentPage();
+ // two pages with statistics size: (69 + 41) * 2 + chunk header size: 9
+ assertEquals(229L, chunkWriter.getCurrentChunkSize());
+
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ try {
+ chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(220, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(220, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
new file mode 100644
index 0000000..a43159f
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ValuePageWriterTest {
+
+ @Test
+ public void testWrite1() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ try {
+ pageWriter.write(1L, 1.0f, false);
+ assertEquals(9, pageWriter.estimateMaxMemSize());
+ ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+ pageWriter.reset(TSDataType.FLOAT);
+ assertEquals(5, pageWriter.estimateMaxMemSize());
+ assertEquals(1, ReadWriteIOUtils.readInt(buffer));
+ assertEquals(((byte) (1 << 7)), ReadWriteIOUtils.readByte(buffer));
+ PlainDecoder decoder = new PlainDecoder();
+ assertEquals(1.0f, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+ assertEquals(0, buffer.remaining());
+ decoder.reset();
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWrite2() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ try {
+ for (int time = 1; time <= 16; time++) {
+ pageWriter.write(time, (float) time, time % 4 == 0);
+ }
+ assertEquals(55, pageWriter.estimateMaxMemSize());
+ ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+ pageWriter.reset(TSDataType.FLOAT);
+ assertEquals(5, pageWriter.estimateMaxMemSize());
+ assertEquals(16, ReadWriteIOUtils.readInt(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ PlainDecoder decoder = new PlainDecoder();
+ for (int value = 1; value <= 16; value++) {
+ if (value % 4 != 0) {
+ assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+ }
+ }
+ assertEquals(0, buffer.remaining());
+ decoder.reset();
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWrite3() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ try {
+ for (int time = 1; time <= 20; time++) {
+ pageWriter.write(time, (float) time, time % 4 == 0);
+ }
+ assertEquals(67, pageWriter.estimateMaxMemSize());
+ ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+ pageWriter.reset(TSDataType.FLOAT);
+ assertEquals(5, pageWriter.estimateMaxMemSize());
+ assertEquals(20, ReadWriteIOUtils.readInt(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer));
+ PlainDecoder decoder = new PlainDecoder();
+ for (int value = 1; value <= 20; value++) {
+ if (value % 4 != 0) {
+ assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+ }
+ }
+ assertEquals(0, buffer.remaining());
+ decoder.reset();
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ for (int time = 1; time <= 20; time++) {
+ pageWriter.write(time, (float) time, time % 4 == 0);
+ }
+ // without page statistics
+ assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+ // total size
+ assertEquals(69, publicBAOS.size());
+ Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(19L, statistics.getEndTime());
+ assertEquals(15, statistics.getCount());
+ assertEquals(1.0f, statistics.getFirstValue(), 0.000001f);
+ assertEquals(19.0f, statistics.getLastValue(), 0.000001f);
+ assertEquals(1.0f, statistics.getMinValue(), 0.000001f);
+ assertEquals(19.0f, statistics.getMaxValue(), 0.000001f);
+ assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f);
+
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+
+ // uncompressedSize
+ assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ // compressedSize
+ assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+
+ // bitmap
+ assertEquals(20, ReadWriteIOUtils.readInt(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer));
+
+ for (int value = 1; value <= 20; value++) {
+ if (value % 4 != 0) {
+ assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+ }
+ }
+ assertEquals(0, buffer.remaining());
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+ ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ for (int time = 1; time <= 20; time++) {
+ pageWriter.write(time, (float) time, time % 4 == 0);
+ }
+ // without page statistics
+ assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false));
+ // total size
+ assertEquals(110, publicBAOS.size());
+ Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(19L, statistics.getEndTime());
+ assertEquals(15, statistics.getCount());
+ assertEquals(1.0f, statistics.getFirstValue(), 0.000001f);
+ assertEquals(19.0f, statistics.getLastValue(), 0.000001f);
+ assertEquals(1.0f, statistics.getMinValue(), 0.000001f);
+ assertEquals(19.0f, statistics.getMaxValue(), 0.000001f);
+ assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f);
+
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // uncompressedSize
+ assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ // compressedSize
+ assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+
+ // Statistics
+ FloatStatistics testStatistics =
+ (FloatStatistics) FloatStatistics.deserialize(buffer, TSDataType.FLOAT);
+ assertEquals(1L, testStatistics.getStartTime());
+ assertEquals(19L, testStatistics.getEndTime());
+ assertEquals(15, testStatistics.getCount());
+ assertEquals(1.0f, testStatistics.getFirstValue(), 0.000001f);
+ assertEquals(19.0f, testStatistics.getLastValue(), 0.000001f);
+ assertEquals(1.0f, testStatistics.getMinValue(), 0.000001f);
+ assertEquals(19.0f, testStatistics.getMaxValue(), 0.000001f);
+ assertEquals(150.0f, (float) testStatistics.getSumDoubleValue(), 0.000001f);
+
+ // bitmap
+ assertEquals(20, ReadWriteIOUtils.readInt(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer));
+
+ for (int value = 1; value <= 20; value++) {
+ if (value % 4 != 0) {
+ assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+ }
+ }
+ assertEquals(0, buffer.remaining());
+ } catch (IOException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
+ Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+ ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ PublicBAOS publicBAOS = new PublicBAOS();
+ try {
+ for (int time = 1; time <= 20; time++) {
+ pageWriter.write(time, (float) time, time % 4 == 0);
+ }
+ // without page statistics
+ assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+ // total size
+ assertEquals(72, publicBAOS.size());
+ Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics();
+ assertEquals(1L, statistics.getStartTime());
+ assertEquals(19L, statistics.getEndTime());
+ assertEquals(15, statistics.getCount());
+ assertEquals(1.0f, statistics.getFirstValue(), 0.000001f);
+ assertEquals(19.0f, statistics.getLastValue(), 0.000001f);
+ assertEquals(1.0f, statistics.getMinValue(), 0.000001f);
+ assertEquals(19.0f, statistics.getMaxValue(), 0.000001f);
+ assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f);
+
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+
+ // uncompressedSize
+ assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ // compressedSize
+ assertEquals(70, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+
+ byte[] compress = new byte[70];
+ buffer.get(compress);
+ byte[] uncompress = new byte[67];
+ IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+ unCompressor.uncompress(compress, 0, 70, uncompress, 0);
+ ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress);
+
+ // bitmap
+ assertEquals(20, ReadWriteIOUtils.readInt(uncompressedBuffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(uncompressedBuffer));
+ assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(uncompressedBuffer));
+ assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(uncompressedBuffer));
+
+ for (int value = 1; value <= 20; value++) {
+ if (value % 4 != 0) {
+ assertEquals((float) value, ReadWriteIOUtils.readFloat(uncompressedBuffer), 0.000001f);
+ }
+ }
+ assertEquals(0, uncompressedBuffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
new file mode 100644
index 0000000..93e18bb
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class VectorChunkWriterImplTest {
+
+ @Test
+ public void testWrite1() {
+ VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+ VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+
+ chunkWriter.sealCurrentPage();
+ // time chunk: 14 + 4 + 160; value chunk 1: 8 + 2 + 4 + 3 + 80; value chunk 2: 8 + 2 + 4 + 3 +
+ // 20; value chunk 3: 9 + 4 + 7 + 20 * 8;
+ assertEquals(492L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeToFileWriter(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // time chunk
+ assertEquals(
+ (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(164, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 164);
+
+ // value chunk 1
+ assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(89, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 89);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(29, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 29);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(171, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(171, buffer.remaining());
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testWrite2() {
+ VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+ VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+ for (int time = 21; time <= 40; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+
+ // time chunk: 14 + (4 + 17 + 160) * 2
+ // value chunk 1: 9 + (2 + 41 + 4 + 3 + 80) * 2
+ // value chunk 2: 9 + (2 + 41 + 4 + 3 + 20) * 2
+ // value chunk 3: 9 + (4 + 57 + 4 + 3 + 160) * 2
+ assertEquals(1259L, chunkWriter.getCurrentChunkSize());
+
+ try {
+ TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+ TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+ chunkWriter.writeToFileWriter(writer);
+ PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+ ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ // time chunk
+ assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(362, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 362);
+
+ // value chunk 1
+ assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(260, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 260);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(140, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ buffer.position(buffer.position() + 140);
+
+ // value chunk 2
+ assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+ assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+ assertEquals(456, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+ assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+ assertEquals(456, buffer.remaining());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
new file mode 100644
index 0000000..795a0a6
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class VectorMeasurementSchemaStub implements IMeasurementSchema {
+
+ @Override
+ public String getMeasurementId() {
+ return "s1.time";
+ }
+
+ @Override
+ public CompressionType getCompressor() {
+ return CompressionType.UNCOMPRESSED;
+ }
+
+ @Override
+ public TSDataType getType() {
+ return TSDataType.Vector;
+ }
+
+ @Override
+ public TSEncoding getTimeTSEncoding() {
+ return TSEncoding.PLAIN;
+ }
+
+ @Override
+ public Encoder getTimeEncoder() {
+ return new PlainEncoder(TSDataType.INT64, 0);
+ }
+
+ @Override
+ public List<String> getValueMeasurementIdList() {
+ return Arrays.asList("s1", "s2", "s3");
+ }
+
+ @Override
+ public List<TSDataType> getValueTSDataTypeList() {
+ return Arrays.asList(TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE);
+ }
+
+ @Override
+ public List<TSEncoding> getValueTSEncodingList() {
+ return Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN);
+ }
+
+ @Override
+ public List<Encoder> getValueEncoderList() {
+ return Arrays.asList(
+ new PlainEncoder(TSDataType.FLOAT, 0),
+ new PlainEncoder(TSDataType.INT32, 0),
+ new PlainEncoder(TSDataType.DOUBLE, 0));
+ }
+}