You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/28 06:33:35 UTC
[iotdb] 01/01: TsBlock pre implementation
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e61858d38da8e6753a5f1ba314c3aaab806a4f18
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Mar 28 11:35:54 2022 +0800
TsBlock pre implementation
---
pom.xml | 5 +
.../apache/iotdb/db/mpp/buffer/ISinkHandle.java | 2 +-
.../apache/iotdb/db/mpp/buffer/ISourceHandle.java | 2 +-
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 2 +-
.../org/apache/iotdb/db/mpp/operator/Operator.java | 2 +-
.../db/mpp/operator/process/AggregateOperator.java | 2 +-
.../mpp/operator/process/DeviceMergeOperator.java | 2 +-
.../db/mpp/operator/process/FillOperator.java | 2 +-
.../mpp/operator/process/FilterNullOperator.java | 2 +-
.../mpp/operator/process/GroupByLevelOperator.java | 2 +-
.../db/mpp/operator/process/LimitOperator.java | 6 +-
.../db/mpp/operator/process/OffsetOperator.java | 2 +-
.../db/mpp/operator/process/SortOperator.java | 2 +-
.../db/mpp/operator/process/TimeJoinOperator.java | 36 ++-
.../db/mpp/operator/sink/FragmentSinkOperator.java | 2 +-
.../iotdb/db/mpp/operator/sink/SinkOperator.java | 2 +-
.../mpp/operator/source/AlignedSeriesScanUtil.java | 88 ++++++
.../source/SeriesAggregateScanOperator.java | 2 +-
.../db/mpp/operator/source/SeriesScanOperator.java | 4 +-
.../db/mpp/operator/source/SeriesScanUtil.java | 56 ++--
.../query/reader/chunk/MemAlignedPageReader.java | 38 ++-
.../iotdb/db/query/reader/chunk/MemPageReader.java | 113 +++++---
tsfile/pom.xml | 4 +
.../apache/iotdb/tsfile/read/common/Column.java | 21 --
.../iotdb/tsfile/read/common/TimeColumn.java | 21 --
.../apache/iotdb/tsfile/read/common/TsBlock.java | 175 ------------
.../iotdb/tsfile/read/common/block/TsBlock.java | 317 +++++++++++++++++++++
.../tsfile/read/common/block/TsBlockBuilder.java | 295 +++++++++++++++++++
.../read/common/block/TsBlockBuilderStatus.java | 75 +++++
.../read/common/{ => block}/TsBlockMetadata.java | 2 +-
.../read/common/block/column/BinaryColumn.java | 110 +++++++
.../common/block/column/BinaryColumnBuilder.java | 148 ++++++++++
.../read/common/block/column/BooleanColumn.java | 109 +++++++
.../common/block/column/BooleanColumnBuilder.java | 150 ++++++++++
.../tsfile/read/common/block/column/Column.java | 87 ++++++
.../read/common/block/column/ColumnBuilder.java | 81 ++++++
.../common/block/column/ColumnBuilderStatus.java | 88 ++++++
.../read/common/block/column/ColumnUtil.java | 97 +++++++
.../read/common/block/column/DoubleColumn.java | 109 +++++++
.../common/block/column/DoubleColumnBuilder.java | 150 ++++++++++
.../read/common/block/column/FloatColumn.java | 108 +++++++
.../common/block/column/FloatColumnBuilder.java | 150 ++++++++++
.../tsfile/read/common/block/column/IntColumn.java | 108 +++++++
.../read/common/block/column/IntColumnBuilder.java | 150 ++++++++++
.../read/common/block/column/LongColumn.java | 108 +++++++
.../common/block/column/LongColumnBuilder.java | 150 ++++++++++
.../block/column/RunLengthEncodedColumn.java | 133 +++++++++
.../read/common/block/column/TimeColumn.java | 95 ++++++
.../common/block/column/TimeColumnBuilder.java | 126 ++++++++
.../iotdb/tsfile/read/reader/IPageReader.java | 2 +-
.../tsfile/read/reader/page/AlignedPageReader.java | 38 ++-
.../iotdb/tsfile/read/reader/page/PageReader.java | 81 ++++--
52 files changed, 3323 insertions(+), 339 deletions(-)
diff --git a/pom.xml b/pom.xml
index 3f1c875..f19c018 100644
--- a/pom.xml
+++ b/pom.xml
@@ -506,6 +506,11 @@
<version>${airline.version}</version>
</dependency>
<dependency>
+ <groupId>org.openjdk.jol</groupId>
+ <artifactId>jol-core</artifactId>
+ <version>0.2</version>
+ </dependency>
+ <dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 0d64cbc..6b4e07c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.buffer;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index b493759..ef2919c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.buffer;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 0216556..5f8607e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.buffer;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index d545c4f..df89218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index c439ad4..a08fb68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index b9fce3a..ebfb54e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index e2732cf..52a3fba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index d8a33ba..0083ae0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 6e19233..285427a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index 47c81bc..25beb6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
@@ -56,8 +56,8 @@ public class LimitOperator implements ProcessOperator {
public TsBlock next() throws IOException {
TsBlock block = child.next();
TsBlock res = block;
- if (block.getCount() <= remainingLimit) {
- remainingLimit -= block.getCount();
+ if (block.getPositionCount() <= remainingLimit) {
+ remainingLimit -= block.getPositionCount();
} else {
res = block.getRegion(0, (int) remainingLimit);
remainingLimit = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index e9eef20..a22985c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index f0ac49b..5e2f0ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index c5665ee..60fa4f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -22,8 +22,13 @@ import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.read.common.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
@@ -48,11 +53,14 @@ public class TimeJoinOperator implements ProcessOperator {
private final int columnCount;
+ private final List<TSDataType> dataTypes;
+
public TimeJoinOperator(
OperatorContext operatorContext,
List<Operator> children,
OrderBy mergeOrder,
- int columnCount) {
+ int columnCount,
+ List<TSDataType> dataTypes) {
this.operatorContext = operatorContext;
this.children = children;
this.inputCount = children.size();
@@ -61,6 +69,7 @@ public class TimeJoinOperator implements ProcessOperator {
this.noMoreTsBlocks = new boolean[this.inputCount];
this.timeSelector = new TimeSelector(this.inputCount << 1, OrderBy.TIMESTAMP_ASC == mergeOrder);
this.columnCount = columnCount;
+ this.dataTypes = dataTypes;
}
@Override
@@ -92,7 +101,7 @@ public class TimeJoinOperator implements ProcessOperator {
inputIndex[i] = 0;
inputTsBlocks[i] = children.get(i).next();
if (!empty(i)) {
- int rowSize = inputTsBlocks[i].getCount();
+ int rowSize = inputTsBlocks[i].getPositionCount();
for (int row = 0; row < rowSize; row++) {
timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
}
@@ -113,22 +122,29 @@ public class TimeJoinOperator implements ProcessOperator {
return null;
}
- TsBlock res = new TsBlock(columnCount);
+ TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
while (!timeSelector.isEmpty() && timeSelector.first() <= currentEndTime) {
- res.addTime(timeSelector.pollFirst());
+ timeBuilder.writeLong(timeSelector.pollFirst());
}
+ tsBlockBuilder.buildValueColumnBuilders(dataTypes);
+
for (int i = 0, column = 0; i < inputCount; i++) {
TsBlock block = inputTsBlocks[i];
TimeColumn timeColumn = block.getTimeColumn();
int valueColumnCount = block.getValueColumnCount();
int startIndex = inputIndex[i];
for (int j = 0; j < valueColumnCount; j++) {
- inputIndex[i] =
- res.addValues(column++, timeColumn, block.getColumn(j), startIndex, currentEndTime);
+ startIndex = inputIndex[i];
+ ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(column++);
+ Column valueColumn = block.getColumn(j);
+ startIndex = columnBuilder.appendColumn(timeColumn, valueColumn, startIndex, timeBuilder);
}
+ inputIndex[i] = startIndex;
}
- return res;
+ return tsBlockBuilder.build();
}
@Override
@@ -156,6 +172,6 @@ public class TimeJoinOperator implements ProcessOperator {
private boolean empty(int columnIndex) {
return inputTsBlocks[columnIndex] == null
- || inputTsBlocks[columnIndex].getCount() == inputIndex[columnIndex];
+ || inputTsBlocks[columnIndex].getPositionCount() == inputIndex[columnIndex];
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
index 7be5cfa..7bbccbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.sink;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
index e5a3e15..b691db0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator.sink;
import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
public interface SinkOperator extends Operator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
new file mode 100644
index 0000000..9fd48c7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.source;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.universal.AlignedDescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.AlignedPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class AlignedSeriesScanUtil extends SeriesScanUtil {
+
+ private final List<TSDataType> dataTypes;
+
+ public AlignedSeriesScanUtil(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ FragmentInstanceContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ super(
+ seriesPath, allSensors, dataType, context, dataSource, timeFilter, valueFilter, ascending);
+ dataTypes =
+ ((AlignedPath) seriesPath)
+ .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+ }
+
+ @Override
+ protected PriorityMergeReader getPriorityMergeReader() {
+ return new AlignedPriorityMergeReader();
+ }
+
+ @Override
+ protected DescPriorityMergeReader getDescPriorityMergeReader() {
+ return new AlignedDescPriorityMergeReader();
+ }
+
+ @Override
+ protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+ TsFileResource resource,
+ PartialPath seriesPath,
+ QueryContext context,
+ Filter filter,
+ Set<String> allSensors)
+ throws IOException {
+ return FileLoaderUtils.loadTimeSeriesMetadata(
+ resource, (AlignedPath) seriesPath, context, filter);
+ }
+
+ @Override
+ protected List<TSDataType> getTsDataTypeList() {
+ return dataTypes;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 0ef6b09..87151ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.operator.source;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 19aa191..e84e649 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
@@ -127,6 +127,6 @@ public class SeriesScanOperator implements Operator {
}
private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || !tsBlock.hasNext();
+ return tsBlock == null || tsBlock.isEmpty();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index 9e774dc..e3d2f9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -35,7 +35,9 @@ import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -389,7 +391,7 @@ public class SeriesScanUtil {
} else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
if (hasNextOverlappedPage()) {
cachedTsBlock = nextOverlappedPage();
- if (cachedTsBlock != null && cachedTsBlock.hasNext()) {
+ if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
hasCachedNextOverlappedPage = true;
return true;
}
@@ -440,7 +442,7 @@ public class SeriesScanUtil {
*/
if (hasNextOverlappedPage()) {
cachedTsBlock = nextOverlappedPage();
- if (cachedTsBlock != null && cachedTsBlock.hasNext()) {
+ if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
hasCachedNextOverlappedPage = true;
return true;
}
@@ -644,7 +646,8 @@ public class SeriesScanUtil {
if (mergeReader.hasNextTimeValuePair()) {
// TODO we still need to consider data type, ascending and descending here
- cachedTsBlock = new TsBlock();
+ TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
+ TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
while (mergeReader.hasNextTimeValuePair()) {
@@ -662,7 +665,7 @@ public class SeriesScanUtil {
* 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
* we could use the first sequence page reader later
*/
- if (cachedTsBlock.hasNext() || firstPageReader != null || !seqPageReaders.isEmpty()) {
+ if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) {
break;
}
// so, we don't have other data except mergeReader
@@ -689,7 +692,8 @@ public class SeriesScanUtil {
|| (!orderUtils.getAscending()
&& timeValuePair.getTimestamp()
< firstPageReader.getStatistics().getStartTime())) {
- hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+ hasCachedNextOverlappedPage = !builder.isEmpty();
+ cachedTsBlock = builder.build();
return hasCachedNextOverlappedPage;
} else if (orderUtils.isOverlapped(
timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
@@ -716,7 +720,8 @@ public class SeriesScanUtil {
|| (!orderUtils.getAscending()
&& timeValuePair.getTimestamp()
< seqPageReaders.get(0).getStatistics().getStartTime())) {
- hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+ hasCachedNextOverlappedPage = !builder.isEmpty();
+ cachedTsBlock = builder.build();
return hasCachedNextOverlappedPage;
} else if (orderUtils.isOverlapped(
timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
@@ -752,37 +757,40 @@ public class SeriesScanUtil {
if (valueFilter == null
|| valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+ builder.declarePosition();
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
switch (dataType) {
case BOOLEAN:
- // tsBlock.putBoolean(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getBoolean());
+ builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
break;
case INT32:
- // tsBlock.putInt(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getInt());
+ builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
break;
case INT64:
- // tsBlock.putLong(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getLong());
+ builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
break;
case FLOAT:
- // tsBlock.putFloat(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getFloat());
+ builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
break;
case DOUBLE:
- // tsBlock.putDouble(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getDouble());
+ builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
break;
case TEXT:
- // tsBlock.putBinary(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getBinary());
+ builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
+ break;
+ case VECTOR:
+ TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+ for (int i = 0; i < values.length; i++) {
+ builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ }
break;
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
}
}
- hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+ hasCachedNextOverlappedPage = builder.isEmpty();
+ cachedTsBlock = builder.build();
/*
* if current overlapped page has valid data, return, otherwise read next overlapped page
*/
@@ -1006,7 +1014,7 @@ public class SeriesScanUtil {
}
}
- protected void unpackSeqTsFileResource() throws IOException {
+ private void unpackSeqTsFileResource() throws IOException {
ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(
orderUtils.getNextSeqFileResource(true),
@@ -1020,7 +1028,7 @@ public class SeriesScanUtil {
}
}
- protected void unpackUnseqTsFileResource() throws IOException {
+ private void unpackUnseqTsFileResource() throws IOException {
ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(
orderUtils.getNextUnseqFileResource(true),
@@ -1046,6 +1054,10 @@ public class SeriesScanUtil {
resource, seriesPath, context, filter, allSensors);
}
+ protected List<TSDataType> getTsDataTypeList() {
+ return Collections.singletonList(dataType);
+ }
+
protected Filter getAnyFilter() {
return timeFilter != null ? timeFilter : valueFilter;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 7216ea7..e7ebc1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -19,12 +19,14 @@
package org.apache.iotdb.db.query.reader.chunk;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -33,6 +35,7 @@ import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
+import java.util.stream.Collectors;
public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
@@ -81,8 +84,37 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
@Override
public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
- // TODO need to implement for mpp
- throw new IllegalStateException("We have not implemented this method.");
+ // TODO change from the row-based style to column-based style
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ chunkMetadata.getValueChunkMetadataList().stream()
+ .map(IChunkMetadata::getDataType)
+ .collect(Collectors.toList()));
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+ // save the first not null value of each row
+ Object firstNotNullObject = null;
+ for (TsPrimitiveType value : values) {
+ if (value != null) {
+ firstNotNullObject = value.getValue();
+ break;
+ }
+ }
+ // if all the sub sensors' value are null in current time
+ // or current row is not satisfied with the filter, just discard it
+ // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will only
+ // accept AlignedPath with only one sub sensor
+ if (firstNotNullObject != null
+ && (valueFilter == null
+ || valueFilter.satisfy(timeValuePair.getTimestamp(), firstNotNullObject))) {
+ builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp());
+ for (int i = 0; i < values.length; i++) {
+ builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ }
+ }
+ }
+ return builder.build();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 79997b5..16704a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -25,13 +25,17 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import java.io.IOException;
+import java.util.Collections;
public class MemPageReader implements IPageReader {
@@ -66,43 +70,80 @@ public class MemPageReader implements IPageReader {
TSDataType dataType = chunkMetadata.getDataType();
// TODO we still need to consider data type, ascending and descending here
- TsBlock tsBlock = new TsBlock();
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
- switch (dataType) {
- case BOOLEAN:
- // tsBlock.putBoolean(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getBoolean());
- break;
- case INT32:
- // tsBlock.putInt(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getInt());
- break;
- case INT64:
- // tsBlock.putLong(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getLong());
- break;
- case FLOAT:
- // tsBlock.putFloat(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getFloat());
- break;
- case DOUBLE:
- // tsBlock.putDouble(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getDouble());
- break;
- case TEXT:
- // tsBlock.putBinary(timeValuePair.getTimestamp(),
- // timeValuePair.getValue().getBinary());
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder valueBuilder = tsBlockBuilder.getColumnBuilder(0);
+ switch (dataType) {
+ case BOOLEAN:
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ if (valueFilter == null
+ || valueFilter.satisfy(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ valueBuilder.writeBoolean(timeValuePair.getValue().getBoolean());
+ }
}
- }
+ break;
+ case INT32:
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ if (valueFilter == null
+ || valueFilter.satisfy(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ valueBuilder.writeInt(timeValuePair.getValue().getInt());
+ }
+ }
+ break;
+ case INT64:
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ if (valueFilter == null
+ || valueFilter.satisfy(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ valueBuilder.writeLong(timeValuePair.getValue().getLong());
+ }
+ }
+ break;
+ case FLOAT:
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ if (valueFilter == null
+ || valueFilter.satisfy(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ valueBuilder.writeFloat(timeValuePair.getValue().getFloat());
+ }
+ }
+ break;
+ case DOUBLE:
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ if (valueFilter == null
+ || valueFilter.satisfy(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ valueBuilder.writeDouble(timeValuePair.getValue().getDouble());
+ }
+ }
+ break;
+ case TEXT:
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ if (valueFilter == null
+ || valueFilter.satisfy(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ valueBuilder.writeBinary(timeValuePair.getValue().getBinary());
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
- return tsBlock;
+ return tsBlockBuilder.build();
}
@Override
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index beb2149..147ed65 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -68,6 +68,10 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.openjdk.jol</groupId>
+ <artifactId>jol-core</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java
deleted file mode 100644
index c4933e1..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-public interface Column {}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
deleted file mode 100644
index 19937f6..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-public class TimeColumn implements Column {}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java
deleted file mode 100644
index a6bffbf..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
-
-import static java.lang.String.format;
-
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
-
- // Describe the column info
- private TsBlockMetadata metadata;
-
- private TimeColumn timeColumn;
-
- private Column[] valueColumns;
-
- private int count;
-
- public TsBlock() {}
-
- public TsBlock(int columnCount) {
- timeColumn = new TimeColumn();
- valueColumns = new Column[columnCount];
- }
-
- public boolean hasNext() {
- return false;
- }
-
- public void next() {}
-
- public TsBlockMetadata getMetadata() {
- return metadata;
- }
-
- public int getCount() {
- return count;
- }
-
- /** TODO need to be implemented after the data structure being defined */
- public long getEndTime() {
- return -1;
- }
-
- public boolean isEmpty() {
- return count == 0;
- }
-
- /**
- * TODO has not been implemented yet
- *
- * @param positionOffset start offset
- * @param length slice length
- * @return view of current TsBlock start from positionOffset to positionOffset + length
- */
- public TsBlock getRegion(int positionOffset, int length) {
- if (positionOffset < 0 || length < 0 || positionOffset + length > count) {
- throw new IndexOutOfBoundsException(
- format(
- "Invalid position %s and length %s in page with %s positions",
- positionOffset, length, count));
- }
- return this;
- }
-
- /** TODO need to be implemented after the data structure being defined */
- public long getTimeByIndex(int index) {
- return -1;
- }
-
- public void addTime(long time) {}
-
- public int getValueColumnCount() {
- return valueColumns.length;
- }
-
- public TimeColumn getTimeColumn() {
- return timeColumn;
- }
-
- public Column getColumn(int columnIndex) {
- return valueColumns[columnIndex];
- }
-
- public int addValues(
- int columnIndex, TimeColumn timeColumn, Column valueColumn, int rowIndex, long endTime) {
-
- return rowIndex;
- }
-
- public TsBlockIterator getTsBlockIterator() {
- return new TsBlockIterator();
- }
-
- // TODO need to be implemented when the data structure is defined
- private class TsBlockIterator implements IPointReader, IBatchDataIterator {
-
- @Override
- public boolean hasNext() {
- return TsBlock.this.hasNext();
- }
-
- @Override
- public boolean hasNext(long minBound, long maxBound) {
- return hasNext();
- }
-
- @Override
- public void next() {
- TsBlock.this.next();
- }
-
- @Override
- public long currentTime() {
- return -1;
- }
-
- @Override
- public Object currentValue() {
- return null;
- }
-
- @Override
- public void reset() {}
-
- @Override
- public int totalLength() {
- return TsBlock.this.getCount();
- }
-
- @Override
- public boolean hasNextTimeValuePair() {
- return hasNext();
- }
-
- @Override
- public TimeValuePair nextTimeValuePair() {
- return null;
- }
-
- @Override
- public TimeValuePair currentTimeValuePair() {
- return null;
- }
-
- @Override
- public void close() {}
- }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
new file mode 100644
index 0000000..9c38360
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+/**
+ * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
+ * and constructs them as a row based view The columns can be series, aggregation result for one
+ * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
+ * the columns.
+ */
+public class TsBlock {
+
+ public static final int INSTANCE_SIZE = ClassLayout.parseClass(TsBlock.class).instanceSize();
+
+ private static final Column[] EMPTY_COLUMNS = new Column[0];
+
+ /**
+ * Visible to give trusted classes like {@link TsBlockBuilder} access to a constructor that
+ * doesn't defensively copy the valueColumns
+ */
+ static TsBlock wrapBlocksWithoutCopy(
+ int positionCount, TimeColumn timeColumn, Column[] valueColumns) {
+ return new TsBlock(false, positionCount, timeColumn, valueColumns);
+ }
+
+ // TODO rethink about if we really need this field
+ // Describe the column info
+ private TsBlockMetadata metadata;
+
+ private final TimeColumn timeColumn;
+
+ private final Column[] valueColumns;
+
+ private final int positionCount;
+
+ private volatile long retainedSizeInBytes = -1;
+
+ public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
+ this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
+ }
+
+ public TsBlock(int positionCount) {
+ this(false, positionCount, null, EMPTY_COLUMNS);
+ }
+
+ public TsBlock(int positionCount, TimeColumn timeColumn, Column... valueColumns) {
+ this(true, positionCount, timeColumn, valueColumns);
+ }
+
+ private TsBlock(
+ boolean columnsCopyRequired,
+ int positionCount,
+ TimeColumn timeColumn,
+ Column[] valueColumns) {
+ requireNonNull(valueColumns, "blocks is null");
+ this.positionCount = positionCount;
+ this.timeColumn = timeColumn;
+ if (valueColumns.length == 0) {
+ this.valueColumns = EMPTY_COLUMNS;
+ // Empty blocks are not considered "retained" by any particular page
+ this.retainedSizeInBytes = INSTANCE_SIZE;
+ } else {
+ this.valueColumns = columnsCopyRequired ? valueColumns.clone() : valueColumns;
+ }
+ }
+
+ public boolean hasNext() {
+ return false;
+ }
+
+ public void next() {}
+
+ public TsBlockMetadata getMetadata() {
+ return metadata;
+ }
+
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ public long getEndTime() {
+ return timeColumn.getEndTime();
+ }
+
+ public boolean isEmpty() {
+ return positionCount == 0;
+ }
+
+ public long getRetainedSizeInBytes() {
+ long retainedSizeInBytes = this.retainedSizeInBytes;
+ if (retainedSizeInBytes < 0) {
+ return updateRetainedSize();
+ }
+ return retainedSizeInBytes;
+ }
+
+ /**
+ * @param positionOffset start offset
+ * @param length slice length
+ * @return view of current TsBlock start from positionOffset to positionOffset + length
+ */
+ public TsBlock getRegion(int positionOffset, int length) {
+ if (positionOffset < 0 || length < 0 || positionOffset + length > positionCount) {
+ throw new IndexOutOfBoundsException(
+ format(
+ "Invalid position %s and length %s in page with %s positions",
+ positionOffset, length, positionCount));
+ }
+ int channelCount = getValueColumnCount();
+ Column[] slicedColumns = new Column[channelCount];
+ for (int i = 0; i < channelCount; i++) {
+ slicedColumns[i] = valueColumns[i].getRegion(positionOffset, length);
+ }
+ return wrapBlocksWithoutCopy(
+ length, (TimeColumn) timeColumn.getRegion(positionOffset, length), slicedColumns);
+ }
+
+ public TsBlock appendValueColumn(Column column) {
+ requireNonNull(column, "column is null");
+ if (positionCount != column.getPositionCount()) {
+ throw new IllegalArgumentException("Block does not have same position count");
+ }
+
+ Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + 1);
+ newBlocks[valueColumns.length] = column;
+ return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
+ }
+
+ public long getTimeByIndex(int index) {
+ return timeColumn.getLong(index);
+ }
+
+ public int getValueColumnCount() {
+ return valueColumns.length;
+ }
+
+ public TimeColumn getTimeColumn() {
+ return timeColumn;
+ }
+
+ public Column getColumn(int columnIndex) {
+ return valueColumns[columnIndex];
+ }
+
+ public TsBlockIterator getTsBlockIterator() {
+ return new TsBlockIterator(0);
+ }
+
+ /** Only used for the batch data of vector time series. */
+ public IBatchDataIterator getBatchDataIterator(int subIndex) {
+ return new AlignedTsBlockIterator(0, subIndex);
+ }
+
+ private class TsBlockIterator implements IPointReader, IBatchDataIterator {
+
+ protected int index;
+
+ public TsBlockIterator(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < positionCount;
+ }
+
+ @Override
+ public boolean hasNext(long minBound, long maxBound) {
+ return hasNext();
+ }
+
+ @Override
+ public void next() {
+ index++;
+ }
+
+ @Override
+ public long currentTime() {
+ return timeColumn.getLong(index);
+ }
+
+ @Override
+ public Object currentValue() {
+ return valueColumns[0].getTsPrimitiveType(index).getValue();
+ }
+
+ @Override
+ public void reset() {
+ index = 0;
+ }
+
+ @Override
+ public int totalLength() {
+ return positionCount;
+ }
+
+ @Override
+ public boolean hasNextTimeValuePair() {
+ return hasNext();
+ }
+
+ @Override
+ public TimeValuePair nextTimeValuePair() {
+ TimeValuePair res = currentTimeValuePair();
+ next();
+ return res;
+ }
+
+ @Override
+ public TimeValuePair currentTimeValuePair() {
+ return new TimeValuePair(
+ timeColumn.getLong(index), valueColumns[0].getTsPrimitiveType(index));
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ private class AlignedTsBlockIterator extends TsBlockIterator {
+
+ private final int subIndex;
+
+ private AlignedTsBlockIterator(int index, int subIndex) {
+ super(index);
+ this.subIndex = subIndex;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (super.hasNext() && currentValue() == null) {
+ super.next();
+ }
+ return super.hasNext();
+ }
+
+ @Override
+ public boolean hasNext(long minBound, long maxBound) {
+ while (super.hasNext() && currentValue() == null) {
+ if (currentTime() < minBound || currentTime() >= maxBound) {
+ break;
+ }
+ super.next();
+ }
+ return super.hasNext();
+ }
+
+ @Override
+ public Object currentValue() {
+ TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(index);
+ return v == null ? null : v.getValue();
+ }
+
+ @Override
+ public int totalLength() {
+ // aligned timeseries' BatchData length() may return the length of time column
+ // we need traverse to VectorBatchDataIterator calculate the actual value column's length
+ int cnt = 0;
+ int indexSave = index;
+ while (hasNext()) {
+ cnt++;
+ next();
+ }
+ index = indexSave;
+ return cnt;
+ }
+ }
+
+ private long updateRetainedSize() {
+ long retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueColumns);
+ retainedSizeInBytes += timeColumn.getRetainedSizeInBytes();
+ for (Column column : valueColumns) {
+ retainedSizeInBytes += column.getRetainedSizeInBytes();
+ }
+ this.retainedSizeInBytes = retainedSizeInBytes;
+ return retainedSizeInBytes;
+ }
+
+ private static int determinePositionCount(Column... columns) {
+ requireNonNull(columns, "columns is null");
+ if (columns.length == 0) {
+ throw new IllegalArgumentException("columns is empty");
+ }
+
+ return columns[0].getPositionCount();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
new file mode 100644
index 0000000..9e615cb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.*;
+
+import java.util.List;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class TsBlockBuilder {
+
+ // We choose default initial size to be 8 for TsBlockBuilder and ColumnBuilder
+ // so the underlying data is larger than the object overhead, and the size is power of 2.
+ //
+ // This could be any other small number.
+ private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8;
+
+ private TimeColumnBuilder timeColumnBuilder;
+ private ColumnBuilder[] valueColumnBuilders;
+ private List<TSDataType> types;
+ private TsBlockBuilderStatus tsBlockBuilderStatus;
+ private int declaredPositions;
+
+ private TsBlockBuilder() {}
+
+ /**
+ * Create a TsBlockBuilder with given types.
+ *
+ * <p>A TsBlockBuilder instance created with this constructor has no estimation about bytes per
+ * entry, therefore it can resize frequently while appending new rows.
+ *
+ * <p>This constructor should only be used to get the initial TsBlockBuilder. Once the
+ * TsBlockBuilder is full use reset() or createTsBlockBuilderLike() to create a new TsBlockBuilder
+ * instance with its size estimated based on previous data.
+ */
+ public TsBlockBuilder(List<TSDataType> types) {
+ this(DEFAULT_INITIAL_EXPECTED_ENTRIES, types);
+ }
+
+ public TsBlockBuilder(int initialExpectedEntries, List<TSDataType> types) {
+ this(initialExpectedEntries, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, types);
+ }
+
+ public static TsBlockBuilder createWithOnlyTimeColumn() {
+ TsBlockBuilder res = new TsBlockBuilder();
+ res.tsBlockBuilderStatus = new TsBlockBuilderStatus(DEFAULT_INITIAL_EXPECTED_ENTRIES);
+ res.timeColumnBuilder =
+ new TimeColumnBuilder(
+ res.tsBlockBuilderStatus.createColumnBuilderStatus(), DEFAULT_INITIAL_EXPECTED_ENTRIES);
+ return res;
+ }
+
+ public static TsBlockBuilder withMaxTsBlockSize(int maxTsBlockBytes, List<TSDataType> types) {
+ return new TsBlockBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxTsBlockBytes, types);
+ }
+
+ private TsBlockBuilder(int initialExpectedEntries, int maxTsBlockBytes, List<TSDataType> types) {
+ this.types = requireNonNull(types, "types is null");
+
+ tsBlockBuilderStatus = new TsBlockBuilderStatus(maxTsBlockBytes);
+ timeColumnBuilder =
+ new TimeColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ valueColumnBuilders = new ColumnBuilder[types.size()];
+
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ // TODO use Type interface to encapsulate createColumnBuilder to each concrete type class
+ // instead of switch-case
+ switch (types.get(i)) {
+ case BOOLEAN:
+ valueColumnBuilders[i] =
+ new BooleanColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case INT32:
+ valueColumnBuilders[i] =
+ new IntColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case INT64:
+ valueColumnBuilders[i] =
+ new LongColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case FLOAT:
+ valueColumnBuilders[i] =
+ new FloatColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case DOUBLE:
+ valueColumnBuilders[i] =
+ new DoubleColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case TEXT:
+ valueColumnBuilders[i] =
+ new BinaryColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown data type: " + types.get(i));
+ }
+ }
+ }
+
+ private TsBlockBuilder(
+ int maxTsBlockBytes,
+ List<TSDataType> types,
+ TimeColumnBuilder templateTimeColumnBuilder,
+ ColumnBuilder[] templateValueColumnBuilders) {
+ this.types = requireNonNull(types, "types is null");
+
+ tsBlockBuilderStatus = new TsBlockBuilderStatus(maxTsBlockBytes);
+ valueColumnBuilders = new ColumnBuilder[types.size()];
+
+ checkArgument(
+ templateValueColumnBuilders.length == types.size(),
+ "Size of templates and types should match");
+ timeColumnBuilder =
+ (TimeColumnBuilder)
+ templateTimeColumnBuilder.newColumnBuilderLike(
+ tsBlockBuilderStatus.createColumnBuilderStatus());
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ valueColumnBuilders[i] =
+ templateValueColumnBuilders[i].newColumnBuilderLike(
+ tsBlockBuilderStatus.createColumnBuilderStatus());
+ }
+ }
+
+ public void buildValueColumnBuilders(List<TSDataType> types) {
+ this.types = requireNonNull(types, "types is null");
+ valueColumnBuilders = new ColumnBuilder[types.size()];
+ int initialExpectedEntries = timeColumnBuilder.getPositionCount();
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ // TODO use Type interface to encapsulate createColumnBuilder to each concrete type class
+ // instead of switch-case
+ switch (types.get(i)) {
+ case BOOLEAN:
+ valueColumnBuilders[i] =
+ new BooleanColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case INT32:
+ valueColumnBuilders[i] =
+ new IntColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case INT64:
+ valueColumnBuilders[i] =
+ new LongColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case FLOAT:
+ valueColumnBuilders[i] =
+ new FloatColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case DOUBLE:
+ valueColumnBuilders[i] =
+ new DoubleColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ case TEXT:
+ valueColumnBuilders[i] =
+ new BinaryColumnBuilder(
+ tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown data type: " + types.get(i));
+ }
+ }
+ }
+
+ public void reset() {
+ if (isEmpty()) {
+ return;
+ }
+ tsBlockBuilderStatus =
+ new TsBlockBuilderStatus(tsBlockBuilderStatus.getMaxTsBlockSizeInBytes());
+
+ declaredPositions = 0;
+
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ valueColumnBuilders[i] =
+ valueColumnBuilders[i].newColumnBuilderLike(
+ tsBlockBuilderStatus.createColumnBuilderStatus());
+ }
+ }
+
+ public TsBlockBuilder newTsBlockBuilderLike() {
+ return new TsBlockBuilder(
+ tsBlockBuilderStatus.getMaxTsBlockSizeInBytes(),
+ types,
+ timeColumnBuilder,
+ valueColumnBuilders);
+ }
+
+ public TimeColumnBuilder getTimeColumnBuilder() {
+ return timeColumnBuilder;
+ }
+
+ public ColumnBuilder getColumnBuilder(int channel) {
+ return valueColumnBuilders[channel];
+ }
+
+ public TSDataType getType(int channel) {
+ return types.get(channel);
+ }
+
+ public void declarePosition() {
+ declaredPositions++;
+ }
+
+ public void declarePositions(int positions) {
+ declaredPositions += positions;
+ }
+
+ public boolean isFull() {
+ return declaredPositions == Integer.MAX_VALUE || tsBlockBuilderStatus.isFull();
+ }
+
+ public boolean isEmpty() {
+ return declaredPositions == 0;
+ }
+
+ public int getPositionCount() {
+ return declaredPositions;
+ }
+
+ public long getSizeInBytes() {
+ return tsBlockBuilderStatus.getSizeInBytes();
+ }
+
+ public long getRetainedSizeInBytes() {
+ // We use a foreach loop instead of streams
+ // as it has much better performance.
+ long retainedSizeInBytes = timeColumnBuilder.getRetainedSizeInBytes();
+ for (ColumnBuilder columnBuilder : valueColumnBuilders) {
+ retainedSizeInBytes += columnBuilder.getRetainedSizeInBytes();
+ }
+ return retainedSizeInBytes;
+ }
+
+ public TsBlock build() {
+ if (valueColumnBuilders.length == 0) {
+ return new TsBlock(declaredPositions);
+ }
+ TimeColumn timeColumn = (TimeColumn) timeColumnBuilder.build();
+ if (timeColumn.getPositionCount() != declaredPositions) {
+ throw new IllegalStateException(
+ format(
+ "Declared positions (%s) does not match time column's number of entries (%s)",
+ declaredPositions, timeColumn.getPositionCount()));
+ }
+
+ Column[] columns = new Column[valueColumnBuilders.length];
+ for (int i = 0; i < columns.length; i++) {
+ columns[i] = valueColumnBuilders[i].build();
+ if (columns[i].getPositionCount() != declaredPositions) {
+ throw new IllegalStateException(
+ format(
+ "Declared positions (%s) does not match column %s's number of entries (%s)",
+ declaredPositions, i, columns[i].getPositionCount()));
+ }
+ }
+
+ return TsBlock.wrapBlocksWithoutCopy(declaredPositions, timeColumn, columns);
+ }
+
+ private static void checkArgument(boolean expression, String errorMessage) {
+ if (!expression) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java
new file mode 100644
index 0000000..a48e9cd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilderStatus;
+
+public class TsBlockBuilderStatus {
+
+ public static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = 1024 * 1024;
+
+ private final int maxTsBlockSizeInBytes;
+
+ private long currentSize;
+
+ public TsBlockBuilderStatus() {
+ this(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ }
+
+ public TsBlockBuilderStatus(int maxTsBlockSizeInBytes) {
+ this.maxTsBlockSizeInBytes = maxTsBlockSizeInBytes;
+ }
+
+ public ColumnBuilderStatus createColumnBuilderStatus() {
+ return new ColumnBuilderStatus(this);
+ }
+
+ public int getMaxTsBlockSizeInBytes() {
+ return maxTsBlockSizeInBytes;
+ }
+
+ public boolean isEmpty() {
+ return currentSize == 0;
+ }
+
+ public boolean isFull() {
+ return currentSize >= maxTsBlockSizeInBytes;
+ }
+
+ public void addBytes(int bytes) {
+ if (bytes < 0) {
+ throw new IllegalArgumentException("bytes cannot be negative");
+ }
+ currentSize += bytes;
+ }
+
+ public long getSizeInBytes() {
+ return currentSize;
+ }
+
+ @Override
+ public String toString() {
+ return "TsBlockBuilderStatus{"
+ + "maxTsBlockSizeInBytes="
+ + maxTsBlockSizeInBytes
+ + ", currentSize="
+ + currentSize
+ + '}';
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
similarity index 97%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
index 7d3a5dd..0f16f1e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.read.common;
+package org.apache.iotdb.tsfile.read.common.block;
import java.util.List;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
new file mode 100644
index 0000000..cfb22c9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BinaryColumn implements Column {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(BinaryColumn.class).instanceSize();
+
+ private final int arrayOffset;
+ private final int positionCount;
+ private final boolean[] valueIsNull;
+ private final Binary[] values;
+
+ private final long retainedSizeInBytes;
+
+ public BinaryColumn(int positionCount, Optional<boolean[]> valueIsNull, Binary[] values) {
+ this(0, positionCount, valueIsNull.orElse(null), values);
+ }
+
+ BinaryColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, Binary[] values) {
+ if (arrayOffset < 0) {
+ throw new IllegalArgumentException("arrayOffset is negative");
+ }
+ this.arrayOffset = arrayOffset;
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+ this.positionCount = positionCount;
+
+ if (values.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("values length is less than positionCount");
+ }
+ this.values = values;
+
+ if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("isNull length is less than positionCount");
+ }
+ this.valueIsNull = valueIsNull;
+
+ // TODO we need to sum up all the Binary's retainedSize here
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ }
+
+ @Override
+ public Binary getBinary(int position) {
+ checkReadablePosition(position);
+ return values[position + arrayOffset];
+ }
+
+ @Override
+ public TsPrimitiveType getTsPrimitiveType(int position) {
+ checkReadablePosition(position);
+ return new TsPrimitiveType.TsBinary(getBinary(position));
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ checkReadablePosition(position);
+ return valueIsNull != null && valueIsNull[position + arrayOffset];
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(getPositionCount(), positionOffset, length);
+ return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
new file mode 100644
index 0000000..c5c629f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BinaryColumnBuilder implements ColumnBuilder {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(BinaryColumnBuilder.class).instanceSize();
+
+ private final ColumnBuilderStatus columnBuilderStatus;
+ private static final BinaryColumn NULL_VALUE_BLOCK =
+ new BinaryColumn(0, 1, new boolean[] {true}, new Binary[1]);
+
+ private boolean initialized;
+ private final int initialEntryCount;
+
+ private int positionCount;
+ private boolean hasNullValue;
+ private boolean hasNonNullValue;
+
+ // it is assumed that these arrays are the same length
+ private boolean[] valueIsNull = new boolean[0];
+ private Binary[] values = new Binary[0];
+
+ private long arraysRetainedSizeInBytes;
+
+ public BinaryColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+ this.initialEntryCount = max(expectedEntries, 1);
+ this.columnBuilderStatus = columnBuilderStatus;
+ updateArraysDataSize();
+ }
+
+ @Override
+ public ColumnBuilder writeBinary(Binary value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = value;
+
+ hasNonNullValue = true;
+ positionCount++;
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+ return writeBinary(value.getBinary());
+ }
+
+ @Override
+ public int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+ int count = timeBuilder.getPositionCount();
+ int index = offset;
+ BinaryColumn column = (BinaryColumn) valueColumn;
+ for (int i = 0; i < count; i++) {
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ writeBinary(column.getBinary(index++));
+ } else {
+ appendNull();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public ColumnBuilder appendNull() {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ valueIsNull[positionCount] = true;
+
+ hasNullValue = true;
+ positionCount++;
+ return this;
+ }
+
+ @Override
+ public Column build() {
+ if (!hasNonNullValue) {
+ return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+ }
+ return new BinaryColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ // TODO we need to sum up all the Binary's retainedSize here
+ long size = INSTANCE_SIZE + arraysRetainedSizeInBytes;
+ if (columnBuilderStatus != null) {
+ size += ColumnBuilderStatus.INSTANCE_SIZE;
+ }
+ return size;
+ }
+
+ @Override
+ public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+ // TODO we should take retain size into account here
+ return new BinaryColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+ }
+
+ private void growCapacity() {
+ int newSize;
+ if (initialized) {
+ newSize = ColumnUtil.calculateNewArraySize(values.length);
+ } else {
+ newSize = initialEntryCount;
+ initialized = true;
+ }
+
+ valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+ values = Arrays.copyOf(values, newSize);
+ updateArraysDataSize();
+ }
+
+ private void updateArraysDataSize() {
+ arraysRetainedSizeInBytes = sizeOf(valueIsNull) + sizeOf(values);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
new file mode 100644
index 0000000..a2b8550
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BooleanColumn implements Column {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(BooleanColumn.class).instanceSize();
+ public static final int SIZE_IN_BYTES_PER_POSITION = Byte.BYTES + Byte.BYTES;
+
+ private final int arrayOffset;
+ private final int positionCount;
+ private final boolean[] valueIsNull;
+ private final boolean[] values;
+
+ private final long retainedSizeInBytes;
+
+ public BooleanColumn(int positionCount, Optional<boolean[]> valueIsNull, boolean[] values) {
+ this(0, positionCount, valueIsNull.orElse(null), values);
+ }
+
+ BooleanColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, boolean[] values) {
+ if (arrayOffset < 0) {
+ throw new IllegalArgumentException("arrayOffset is negative");
+ }
+ this.arrayOffset = arrayOffset;
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+ this.positionCount = positionCount;
+
+ if (values.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("values length is less than positionCount");
+ }
+ this.values = values;
+
+ if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("isNull length is less than positionCount");
+ }
+ this.valueIsNull = valueIsNull;
+
+ retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+ }
+
+ @Override
+ public boolean getBoolean(int position) {
+ checkReadablePosition(position);
+ return values[position + arrayOffset];
+ }
+
+ @Override
+ public TsPrimitiveType getTsPrimitiveType(int position) {
+ checkReadablePosition(position);
+ return new TsPrimitiveType.TsBoolean(getBoolean(position));
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ checkReadablePosition(position);
+ return valueIsNull != null && valueIsNull[position + arrayOffset];
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(getPositionCount(), positionOffset, length);
+ return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
new file mode 100644
index 0000000..5e7629d
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BooleanColumnBuilder implements ColumnBuilder {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(BooleanColumnBuilder.class).instanceSize();
+ private static final BooleanColumn NULL_VALUE_BLOCK =
+ new BooleanColumn(0, 1, new boolean[] {true}, new boolean[1]);
+
+ private final ColumnBuilderStatus columnBuilderStatus;
+ private boolean initialized;
+ private final int initialEntryCount;
+
+ private int positionCount;
+ private boolean hasNullValue;
+ private boolean hasNonNullValue;
+
+ // it is assumed that these arrays are the same length
+ private boolean[] valueIsNull = new boolean[0];
+ private boolean[] values = new boolean[0];
+
+ private long retainedSizeInBytes;
+
+ public BooleanColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+ this.columnBuilderStatus = columnBuilderStatus;
+ this.initialEntryCount = max(expectedEntries, 1);
+
+ updateDataSize();
+ }
+
+ @Override
+ public ColumnBuilder writeBoolean(boolean value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = value;
+
+ hasNonNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(BooleanColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+ return writeBoolean(value.getBoolean());
+ }
+
+ @Override
+ public int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+ int count = timeBuilder.getPositionCount();
+ int index = offset;
+ BooleanColumn column = (BooleanColumn) valueColumn;
+ for (int i = 0; i < count; i++) {
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ writeBoolean(column.getBoolean(index++));
+ } else {
+ appendNull();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public ColumnBuilder appendNull() {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ valueIsNull[positionCount] = true;
+
+ hasNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(BooleanColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public Column build() {
+ if (!hasNonNullValue) {
+ return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+ }
+ return new BooleanColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+ return new BooleanColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+ }
+
+ private void growCapacity() {
+ int newSize;
+ if (initialized) {
+ newSize = ColumnUtil.calculateNewArraySize(values.length);
+ } else {
+ newSize = initialEntryCount;
+ initialized = true;
+ }
+
+ valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+ values = Arrays.copyOf(values, newSize);
+ updateDataSize();
+ }
+
+ private void updateDataSize() {
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ if (columnBuilderStatus != null) {
+ retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
new file mode 100644
index 0000000..adc06a1
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public interface Column {
+
+ /** Gets a boolean at {@code position}. */
+ default boolean getBoolean(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Gets a little endian int at {@code position}. */
+ default int getInt(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Gets a little endian long at {@code position}. */
+ default long getLong(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Gets a float at {@code position}. */
+ default float getFloat(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Gets a double at {@code position}. */
+ default double getDouble(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Gets a Binary at {@code position}. */
+ default Binary getBinary(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Gets a TsPrimitiveType at {@code position}. */
+ default TsPrimitiveType getTsPrimitiveType(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * Is the specified position null?
+ *
+ * @throws IllegalArgumentException if this position is not valid. The method may return false
+ * without throwing exception when there are no nulls in the block, even if the position is
+ * invalid
+ */
+ boolean isNull(int position);
+
+ /** Returns the number of positions in this block. */
+ int getPositionCount();
+
+ /**
+ * Returns the retained size of this column in memory, including over-allocations. This method is
+ * called from the inner most execution loop and must be fast.
+ */
+ long getRetainedSizeInBytes();
+
+ /**
+ * Returns a column starting at the specified position and extends for the specified length. The
+ * specified region must be entirely contained within this column.
+ *
+ * <p>The region can be a view over this column. If this column is released, the region column may
+ * also be released. If the region column is released, this block may also be released.
+ */
+ Column getRegion(int positionOffset, int length);
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
new file mode 100644
index 0000000..a16e5fa
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public interface ColumnBuilder {
+
+ /** Write a boolean to the current entry; */
+ default ColumnBuilder writeBoolean(boolean value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Write a short to the current entry; */
+ default ColumnBuilder writeInt(int value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Write a int to the current entry; */
+ default ColumnBuilder writeLong(long value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Write a long to the current entry; */
+ default ColumnBuilder writeFloat(float value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Write a byte sequences to the current entry; */
+ default ColumnBuilder writeDouble(double value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Write a Binary to the current entry; */
+ default ColumnBuilder writeBinary(Binary value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** Write a TsPrimitiveType sequences to the current entry; */
+ default ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder);
+
+ /** Appends a null value to the block. */
+ ColumnBuilder appendNull();
+
+ /** Builds the block. This method can be called multiple times. */
+ Column build();
+
+ /**
+ * Returns the retained size of this column in memory, including over-allocations. This method is
+ * called from the inner most execution loop and must be fast.
+ */
+ long getRetainedSizeInBytes();
+
+ /**
+ * Creates a new column builder of the same type based on the current usage statistics of this
+ * column builder.
+ */
+ ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus);
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
new file mode 100644
index 0000000..d0f62a2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ColumnBuilderStatus {
+
+ public static final int INSTANCE_SIZE = deepInstanceSize(ColumnBuilderStatus.class);
+
+ private final TsBlockBuilderStatus tsBlockBuilderStatus;
+
+ private int currentSize;
+
+ public ColumnBuilderStatus(TsBlockBuilderStatus tsBlockBuilderStatus) {
+ this.tsBlockBuilderStatus =
+ requireNonNull(tsBlockBuilderStatus, "tsBlockBuilderStatus must not be null");
+ }
+
+ public int getMaxTsBlockSizeInBytes() {
+ return tsBlockBuilderStatus.getMaxTsBlockSizeInBytes();
+ }
+
+ public void addBytes(int bytes) {
+ currentSize += bytes;
+ tsBlockBuilderStatus.addBytes(bytes);
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnBuilderStatus{" + ", currentSize=" + currentSize + '}';
+ }
+
+ /**
+ * Computes the size of an instance of this class assuming that all reference fields are non-null
+ */
+ private static int deepInstanceSize(Class<?> clazz) {
+ if (clazz.isArray()) {
+ throw new IllegalArgumentException(
+ format(
+ "Cannot determine size of %s because it contains an array", clazz.getSimpleName()));
+ }
+ if (clazz.isInterface()) {
+ throw new IllegalArgumentException(format("%s is an interface", clazz.getSimpleName()));
+ }
+ if (Modifier.isAbstract(clazz.getModifiers())) {
+ throw new IllegalArgumentException(format("%s is abstract", clazz.getSimpleName()));
+ }
+ if (!clazz.getSuperclass().equals(Object.class)) {
+ throw new IllegalArgumentException(
+ format(
+ "Cannot determine size of a subclass. %s extends from %s",
+ clazz.getSimpleName(), clazz.getSuperclass().getSimpleName()));
+ }
+
+ int size = ClassLayout.parseClass(clazz).instanceSize();
+ for (Field field : clazz.getDeclaredFields()) {
+ if (!field.getType().isPrimitive()) {
+ size += deepInstanceSize(field.getType());
+ }
+ }
+ return size;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java
new file mode 100644
index 0000000..150f713
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import static java.lang.Math.ceil;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ColumnUtil {
+
+ private static final double BLOCK_RESET_SKEW = 1.25;
+
+ private static final int DEFAULT_CAPACITY = 64;
+ // See java.util.ArrayList for an explanation
+ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ private ColumnUtil() {}
+
+ static void checkArrayRange(int[] array, int offset, int length) {
+ requireNonNull(array, "array is null");
+ if (offset < 0 || length < 0 || offset + length > array.length) {
+ throw new IndexOutOfBoundsException(
+ format(
+ "Invalid offset %s and length %s in array with %s elements",
+ offset, length, array.length));
+ }
+ }
+
+ static void checkValidRegion(int positionCount, int positionOffset, int length) {
+ if (positionOffset < 0 || length < 0 || positionOffset + length > positionCount) {
+ throw new IndexOutOfBoundsException(
+ format(
+ "Invalid position %s and length %s in block with %s positions",
+ positionOffset, length, positionCount));
+ }
+ }
+
+ static void checkValidPositions(boolean[] positions, int positionCount) {
+ if (positions.length != positionCount) {
+ throw new IllegalArgumentException(
+ format(
+ "Invalid positions array size %d, actual position count is %d",
+ positions.length, positionCount));
+ }
+ }
+
+ static void checkValidPosition(int position, int positionCount) {
+ if (position < 0 || position >= positionCount) {
+ throw new IllegalArgumentException(
+ format("Invalid position %s in block with %s positions", position, positionCount));
+ }
+ }
+
+ static int calculateNewArraySize(int currentSize) {
+ // grow array by 50%
+ long newSize = (long) currentSize + (currentSize >> 1);
+
+ // verify new size is within reasonable bounds
+ if (newSize < DEFAULT_CAPACITY) {
+ newSize = DEFAULT_CAPACITY;
+ } else if (newSize > MAX_ARRAY_SIZE) {
+ newSize = MAX_ARRAY_SIZE;
+ if (newSize == currentSize) {
+ throw new IllegalArgumentException(format("Cannot grow array beyond '%s'", MAX_ARRAY_SIZE));
+ }
+ }
+ return (int) newSize;
+ }
+
+ static int calculateBlockResetSize(int currentSize) {
+ long newSize = (long) ceil(currentSize * BLOCK_RESET_SKEW);
+
+ // verify new size is within reasonable bounds
+ if (newSize < DEFAULT_CAPACITY) {
+ newSize = DEFAULT_CAPACITY;
+ } else if (newSize > MAX_ARRAY_SIZE) {
+ newSize = MAX_ARRAY_SIZE;
+ }
+ return (int) newSize;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
new file mode 100644
index 0000000..d16e0c0
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class DoubleColumn implements Column {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(DoubleColumn.class).instanceSize();
+ public static final int SIZE_IN_BYTES_PER_POSITION = Double.BYTES + Byte.BYTES;
+
+ private final int arrayOffset;
+ private final int positionCount;
+ private final boolean[] valueIsNull;
+ private final double[] values;
+
+ private final long retainedSizeInBytes;
+
+ public DoubleColumn(int positionCount, Optional<boolean[]> valueIsNull, double[] values) {
+ this(0, positionCount, valueIsNull.orElse(null), values);
+ }
+
+ DoubleColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, double[] values) {
+ if (arrayOffset < 0) {
+ throw new IllegalArgumentException("arrayOffset is negative");
+ }
+ this.arrayOffset = arrayOffset;
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+ this.positionCount = positionCount;
+
+ if (values.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("values length is less than positionCount");
+ }
+ this.values = values;
+
+ if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("isNull length is less than positionCount");
+ }
+ this.valueIsNull = valueIsNull;
+
+ retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+ }
+
+ @Override
+ public double getDouble(int position) {
+ checkReadablePosition(position);
+ return values[position + arrayOffset];
+ }
+
+ @Override
+ public TsPrimitiveType getTsPrimitiveType(int position) {
+ checkReadablePosition(position);
+ return new TsPrimitiveType.TsDouble(getDouble(position));
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ checkReadablePosition(position);
+ return valueIsNull != null && valueIsNull[position + arrayOffset];
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(getPositionCount(), positionOffset, length);
+ return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
new file mode 100644
index 0000000..a01f6f2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class DoubleColumnBuilder implements ColumnBuilder {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(DoubleColumnBuilder.class).instanceSize();
+ private static final DoubleColumn NULL_VALUE_BLOCK =
+ new DoubleColumn(0, 1, new boolean[] {true}, new double[1]);
+
+ private final ColumnBuilderStatus columnBuilderStatus;
+ private boolean initialized;
+ private final int initialEntryCount;
+
+ private int positionCount;
+ private boolean hasNullValue;
+ private boolean hasNonNullValue;
+
+ // it is assumed that these arrays are the same length
+ private boolean[] valueIsNull = new boolean[0];
+ private double[] values = new double[0];
+
+ private long retainedSizeInBytes;
+
+ public DoubleColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+ this.columnBuilderStatus = columnBuilderStatus;
+ this.initialEntryCount = max(expectedEntries, 1);
+
+ updateDataSize();
+ }
+
+ @Override
+ public ColumnBuilder writeDouble(double value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = value;
+
+ hasNonNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(DoubleColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+ return writeDouble(value.getDouble());
+ }
+
+ @Override
+ public int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+ int count = timeBuilder.getPositionCount();
+ int index = offset;
+ DoubleColumn column = (DoubleColumn) valueColumn;
+ for (int i = 0; i < count; i++) {
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ writeDouble(column.getDouble(index++));
+ } else {
+ appendNull();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public ColumnBuilder appendNull() {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ valueIsNull[positionCount] = true;
+
+ hasNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(DoubleColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public Column build() {
+ if (!hasNonNullValue) {
+ return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+ }
+ return new DoubleColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+ return new DoubleColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+ }
+
+ private void growCapacity() {
+ int newSize;
+ if (initialized) {
+ newSize = ColumnUtil.calculateNewArraySize(values.length);
+ } else {
+ newSize = initialEntryCount;
+ initialized = true;
+ }
+
+ valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+ values = Arrays.copyOf(values, newSize);
+ updateDataSize();
+ }
+
+ private void updateDataSize() {
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ if (columnBuilderStatus != null) {
+ retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
new file mode 100644
index 0000000..6543bde
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class FloatColumn implements Column {
+
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(FloatColumn.class).instanceSize();
+ public static final int SIZE_IN_BYTES_PER_POSITION = Float.BYTES + Byte.BYTES;
+
+ private final int arrayOffset;
+ private final int positionCount;
+ private final boolean[] valueIsNull;
+ private final float[] values;
+
+ private final long retainedSizeInBytes;
+
+ public FloatColumn(int positionCount, Optional<boolean[]> valueIsNull, float[] values) {
+ this(0, positionCount, valueIsNull.orElse(null), values);
+ }
+
+ FloatColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, float[] values) {
+ if (arrayOffset < 0) {
+ throw new IllegalArgumentException("arrayOffset is negative");
+ }
+ this.arrayOffset = arrayOffset;
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+ this.positionCount = positionCount;
+
+ if (values.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("values length is less than positionCount");
+ }
+ this.values = values;
+
+ if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("isNull length is less than positionCount");
+ }
+ this.valueIsNull = valueIsNull;
+
+ retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+ }
+
+ @Override
+ public float getFloat(int position) {
+ checkReadablePosition(position);
+ return values[position + arrayOffset];
+ }
+
+ @Override
+ public TsPrimitiveType getTsPrimitiveType(int position) {
+ checkReadablePosition(position);
+ return new TsPrimitiveType.TsFloat(getFloat(position));
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ checkReadablePosition(position);
+ return valueIsNull != null && valueIsNull[position + arrayOffset];
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(getPositionCount(), positionOffset, length);
+ return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
new file mode 100644
index 0000000..ff89ea2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class FloatColumnBuilder implements ColumnBuilder {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(FloatColumnBuilder.class).instanceSize();
+ private static final FloatColumn NULL_VALUE_BLOCK =
+ new FloatColumn(0, 1, new boolean[] {true}, new float[1]);
+
+ private final ColumnBuilderStatus columnBuilderStatus;
+ private boolean initialized;
+ private final int initialEntryCount;
+
+ private int positionCount;
+ private boolean hasNullValue;
+ private boolean hasNonNullValue;
+
+ // it is assumed that these arrays are the same length
+ private boolean[] valueIsNull = new boolean[0];
+ private float[] values = new float[0];
+
+ private long retainedSizeInBytes;
+
+ public FloatColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+ this.columnBuilderStatus = columnBuilderStatus;
+ this.initialEntryCount = max(expectedEntries, 1);
+
+ updateDataSize();
+ }
+
+ @Override
+ public ColumnBuilder writeFloat(float value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = value;
+
+ hasNonNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(FloatColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+ return writeFloat(value.getFloat());
+ }
+
+ @Override
+ public int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+ int count = timeBuilder.getPositionCount();
+ int index = offset;
+ FloatColumn column = (FloatColumn) valueColumn;
+ for (int i = 0; i < count; i++) {
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ writeFloat(column.getFloat(index++));
+ } else {
+ appendNull();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public ColumnBuilder appendNull() {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ valueIsNull[positionCount] = true;
+
+ hasNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(FloatColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public Column build() {
+ if (!hasNonNullValue) {
+ return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+ }
+ return new FloatColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+ return new FloatColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+ }
+
+ private void growCapacity() {
+ int newSize;
+ if (initialized) {
+ newSize = ColumnUtil.calculateNewArraySize(values.length);
+ } else {
+ newSize = initialEntryCount;
+ initialized = true;
+ }
+
+ valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+ values = Arrays.copyOf(values, newSize);
+ updateDataSize();
+ }
+
+ private void updateDataSize() {
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ if (columnBuilderStatus != null) {
+ retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
new file mode 100644
index 0000000..4ad843f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class IntColumn implements Column {
+
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(IntColumn.class).instanceSize();
+ public static final int SIZE_IN_BYTES_PER_POSITION = Integer.BYTES + Byte.BYTES;
+
+ private final int arrayOffset;
+ private final int positionCount;
+ private final boolean[] valueIsNull;
+ private final int[] values;
+
+ private final long retainedSizeInBytes;
+
+ public IntColumn(int positionCount, Optional<boolean[]> valueIsNull, int[] values) {
+ this(0, positionCount, valueIsNull.orElse(null), values);
+ }
+
+ IntColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, int[] values) {
+ if (arrayOffset < 0) {
+ throw new IllegalArgumentException("arrayOffset is negative");
+ }
+ this.arrayOffset = arrayOffset;
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+ this.positionCount = positionCount;
+
+ if (values.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("values length is less than positionCount");
+ }
+ this.values = values;
+
+ if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("isNull length is less than positionCount");
+ }
+ this.valueIsNull = valueIsNull;
+
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ }
+
+ @Override
+ public int getInt(int position) {
+ checkReadablePosition(position);
+ return values[position + arrayOffset];
+ }
+
+ @Override
+ public TsPrimitiveType getTsPrimitiveType(int position) {
+ checkReadablePosition(position);
+ return new TsPrimitiveType.TsInt(getInt(position));
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ checkReadablePosition(position);
+ return valueIsNull != null && valueIsNull[position + arrayOffset];
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(getPositionCount(), positionOffset, length);
+ return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
new file mode 100644
index 0000000..9ab3f7f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class IntColumnBuilder implements ColumnBuilder {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(IntColumnBuilder.class).instanceSize();
+ private static final IntColumn NULL_VALUE_BLOCK =
+ new IntColumn(0, 1, new boolean[] {true}, new int[1]);
+
+ private final ColumnBuilderStatus columnBuilderStatus;
+ private boolean initialized;
+ private final int initialEntryCount;
+
+ private int positionCount;
+ private boolean hasNullValue;
+ private boolean hasNonNullValue;
+
+ // it is assumed that these arrays are the same length
+ private boolean[] valueIsNull = new boolean[0];
+ private int[] values = new int[0];
+
+ private long retainedSizeInBytes;
+
+ public IntColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+ this.columnBuilderStatus = columnBuilderStatus;
+ this.initialEntryCount = max(expectedEntries, 1);
+
+ updateDataSize();
+ }
+
+ @Override
+ public ColumnBuilder writeInt(int value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = value;
+
+ hasNonNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(IntColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+ return writeInt(value.getInt());
+ }
+
+ @Override
+ public int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+ int count = timeBuilder.getPositionCount();
+ int index = offset;
+ IntColumn column = (IntColumn) valueColumn;
+ for (int i = 0; i < count; i++) {
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ writeInt(column.getInt(index++));
+ } else {
+ appendNull();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public ColumnBuilder appendNull() {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ valueIsNull[positionCount] = true;
+
+ hasNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(IntColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public Column build() {
+ if (!hasNonNullValue) {
+ return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+ }
+ return new IntColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+ return new IntColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+ }
+
+ private void growCapacity() {
+ int newSize;
+ if (initialized) {
+ newSize = ColumnUtil.calculateNewArraySize(values.length);
+ } else {
+ newSize = initialEntryCount;
+ initialized = true;
+ }
+
+ valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+ values = Arrays.copyOf(values, newSize);
+ updateDataSize();
+ }
+
+ private void updateDataSize() {
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ if (columnBuilderStatus != null) {
+ retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
new file mode 100644
index 0000000..9ab9edd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class LongColumn implements Column {
+
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongColumn.class).instanceSize();
+ public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES + Byte.BYTES;
+
+ private final int arrayOffset;
+ private final int positionCount;
+ private final boolean[] valueIsNull;
+ private final long[] values;
+
+ private final long retainedSizeInBytes;
+
+ public LongColumn(int positionCount, Optional<boolean[]> valueIsNull, long[] values) {
+ this(0, positionCount, valueIsNull.orElse(null), values);
+ }
+
+ LongColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, long[] values) {
+ if (arrayOffset < 0) {
+ throw new IllegalArgumentException("arrayOffset is negative");
+ }
+ this.arrayOffset = arrayOffset;
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+ this.positionCount = positionCount;
+
+ if (values.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("values length is less than positionCount");
+ }
+ this.values = values;
+
+ if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("isNull length is less than positionCount");
+ }
+ this.valueIsNull = valueIsNull;
+
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ }
+
+ @Override
+ public long getLong(int position) {
+ checkReadablePosition(position);
+ return values[position + arrayOffset];
+ }
+
+ @Override
+ public TsPrimitiveType getTsPrimitiveType(int position) {
+ checkReadablePosition(position);
+ return new TsPrimitiveType.TsLong(getLong(position));
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ checkReadablePosition(position);
+ return valueIsNull != null && valueIsNull[position + arrayOffset];
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(getPositionCount(), positionOffset, length);
+ return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
new file mode 100644
index 0000000..9053913
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class LongColumnBuilder implements ColumnBuilder {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(LongColumnBuilder.class).instanceSize();
+ private static final LongColumn NULL_VALUE_BLOCK =
+ new LongColumn(0, 1, new boolean[] {true}, new long[1]);
+
+ private final ColumnBuilderStatus columnBuilderStatus;
+ private boolean initialized;
+ private final int initialEntryCount;
+
+ private int positionCount;
+ private boolean hasNullValue;
+ private boolean hasNonNullValue;
+
+ // it is assumed that these arrays are the same length
+ private boolean[] valueIsNull = new boolean[0];
+ private long[] values = new long[0];
+
+ private long retainedSizeInBytes;
+
+ public LongColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+ this.columnBuilderStatus = columnBuilderStatus;
+ this.initialEntryCount = max(expectedEntries, 1);
+
+ updateDataSize();
+ }
+
+ @Override
+ public ColumnBuilder writeLong(long value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = value;
+
+ hasNonNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+ return writeLong(value.getLong());
+ }
+
+ @Override
+ public int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+ int count = timeBuilder.getPositionCount();
+ int index = offset;
+ LongColumn column = (LongColumn) valueColumn;
+ for (int i = 0; i < count; i++) {
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ writeLong(column.getLong(index++));
+ } else {
+ appendNull();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public ColumnBuilder appendNull() {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ valueIsNull[positionCount] = true;
+
+ hasNullValue = true;
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public Column build() {
+ if (!hasNonNullValue) {
+ return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+ }
+ return new LongColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+ return new LongColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+ }
+
+ private void growCapacity() {
+ int newSize;
+ if (initialized) {
+ newSize = ColumnUtil.calculateNewArraySize(values.length);
+ } else {
+ newSize = initialEntryCount;
+ initialized = true;
+ }
+
+ valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+ values = Arrays.copyOf(values, newSize);
+ updateDataSize();
+ }
+
+ private void updateDataSize() {
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+ if (columnBuilderStatus != null) {
+ retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
new file mode 100644
index 0000000..3977500
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+
+public class RunLengthEncodedColumn implements Column {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(RunLengthEncodedColumn.class).instanceSize();
+
+ private final Column value;
+ private final int positionCount;
+
+ public RunLengthEncodedColumn(Column value, int positionCount) {
+ requireNonNull(value, "value is null");
+ if (value.getPositionCount() != 1) {
+ throw new IllegalArgumentException(
+ format(
+ "Expected value to contain a single position but has %s positions",
+ value.getPositionCount()));
+ }
+
+ if (value instanceof RunLengthEncodedColumn) {
+ this.value = ((RunLengthEncodedColumn) value).getValue();
+ } else {
+ this.value = value;
+ }
+
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+
+ this.positionCount = positionCount;
+ }
+
+ public Column getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean getBoolean(int position) {
+ checkReadablePosition(position);
+ return value.getBoolean(position);
+ }
+
+ @Override
+ public int getInt(int position) {
+ checkReadablePosition(position);
+ return value.getInt(position);
+ }
+
+ @Override
+ public long getLong(int position) {
+ checkReadablePosition(position);
+ return value.getLong(position);
+ }
+
+ @Override
+ public float getFloat(int position) {
+ checkReadablePosition(position);
+ return value.getFloat(position);
+ }
+
+ @Override
+ public double getDouble(int position) {
+ checkReadablePosition(position);
+ return value.getDouble(position);
+ }
+
+ @Override
+ public Binary getBinary(int position) {
+ checkReadablePosition(position);
+ return value.getBinary(position);
+ }
+
+ @Override
+ public TsPrimitiveType getTsPrimitiveType(int position) {
+ checkReadablePosition(position);
+ return value.getTsPrimitiveType(position);
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ checkReadablePosition(position);
+ return value.isNull(0);
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return INSTANCE_SIZE + value.getRetainedSizeInBytes();
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(positionCount, positionOffset, length);
+ return new RunLengthEncodedColumn(value, length);
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= positionCount) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
new file mode 100644
index 0000000..c8d4748
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class TimeColumn implements Column {
+
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongColumn.class).instanceSize();
+ public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES;
+
+ private final int arrayOffset;
+ private final int positionCount;
+ private final long[] values;
+
+ private final long retainedSizeInBytes;
+
+ public TimeColumn(int positionCount, long[] values) {
+ this(0, positionCount, values);
+ }
+
+ TimeColumn(int arrayOffset, int positionCount, long[] values) {
+ if (arrayOffset < 0) {
+ throw new IllegalArgumentException("arrayOffset is negative");
+ }
+ this.arrayOffset = arrayOffset;
+ if (positionCount < 0) {
+ throw new IllegalArgumentException("positionCount is negative");
+ }
+ this.positionCount = positionCount;
+
+ if (values.length - arrayOffset < positionCount) {
+ throw new IllegalArgumentException("values length is less than positionCount");
+ }
+ this.values = values;
+
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
+ }
+
+ @Override
+ public long getLong(int position) {
+ checkReadablePosition(position);
+ return values[position + arrayOffset];
+ }
+
+ @Override
+ public boolean isNull(int position) {
+ return false;
+ }
+
+ @Override
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public Column getRegion(int positionOffset, int length) {
+ checkValidRegion(getPositionCount(), positionOffset, length);
+ return new TimeColumn(positionOffset + arrayOffset, length, values);
+ }
+
+ public long getEndTime() {
+ return values[getPositionCount() + arrayOffset - 1];
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
new file mode 100644
index 0000000..9b455cb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class TimeColumnBuilder implements ColumnBuilder {
+
+ private static final int INSTANCE_SIZE =
+ ClassLayout.parseClass(TimeColumnBuilder.class).instanceSize();
+
+ private final ColumnBuilderStatus columnBuilderStatus;
+ private boolean initialized;
+ private final int initialEntryCount;
+
+ private int positionCount;
+
+ private long[] values = new long[0];
+
+ private long retainedSizeInBytes;
+
+ public TimeColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+ this.columnBuilderStatus = columnBuilderStatus;
+ this.initialEntryCount = max(expectedEntries, 1);
+
+ updateDataSize();
+ }
+
+ @Override
+ public ColumnBuilder writeLong(long value) {
+ if (values.length <= positionCount) {
+ growCapacity();
+ }
+
+ values[positionCount] = value;
+
+ positionCount++;
+ if (columnBuilderStatus != null) {
+ columnBuilderStatus.addBytes(TimeColumn.SIZE_IN_BYTES_PER_POSITION);
+ }
+ return this;
+ }
+
+ @Override
+ public int appendColumn(
+ TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ @Override
+ public ColumnBuilder appendNull() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ @Override
+ public Column build() {
+ return new TimeColumn(0, positionCount, values);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return retainedSizeInBytes;
+ }
+
+ @Override
+ public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+ return new TimeColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+ }
+
+ public int getPositionCount() {
+ return positionCount;
+ }
+
+ public long getTime(int position) {
+ checkReadablePosition(position);
+ return values[position];
+ }
+
+ private void growCapacity() {
+ int newSize;
+ if (initialized) {
+ newSize = ColumnUtil.calculateNewArraySize(values.length);
+ } else {
+ newSize = initialEntryCount;
+ initialized = true;
+ }
+
+ values = Arrays.copyOf(values, newSize);
+ updateDataSize();
+ }
+
+ private void updateDataSize() {
+ retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
+ if (columnBuilderStatus != null) {
+ retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+ }
+ }
+
+ private void checkReadablePosition(int position) {
+ if (position < 0 || position >= getPositionCount()) {
+ throw new IllegalArgumentException("position is not valid");
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index db76550..3affee8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.tsfile.read.reader;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 2efe6802..dd75b21 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -25,7 +25,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -36,6 +37,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class AlignedPageReader implements IPageReader, IAlignedPageReader {
@@ -105,8 +107,38 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
@Override
public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
- // TODO need to implement for mpp
- throw new IllegalStateException("We have not implemented this method.");
+ // TODO change from the row-based style to column-based style
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ valuePageReaderList.stream()
+ .map(ValuePageReader::getDataType)
+ .collect(Collectors.toList()));
+ int timeIndex = -1;
+ while (timePageReader.hasNextTime()) {
+ long timestamp = timePageReader.nextTime();
+ timeIndex++;
+ // if all the sub sensors' value are null in current row, just discard it
+ boolean isNull = true;
+ Object notNullObject = null;
+ TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
+ for (int i = 0; i < v.length; i++) {
+ ValuePageReader pageReader = valuePageReaderList.get(i);
+ v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
+ if (v[i] != null) {
+ isNull = false;
+ notNullObject = v[i].getValue();
+ }
+ }
+ // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
+ // sensor
+ if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
+ builder.getTimeColumnBuilder().writeLong(timestamp);
+ for (int i = 0; i < v.length; i++) {
+ builder.getColumnBuilder(i).writeTsPrimitiveType(v[i]);
+ }
+ }
+ }
+ return builder.build();
}
public void setDeleteIntervalList(List<List<TimeRange>> list) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index c1a4c59..ec5ff85 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -26,7 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
@@ -35,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
public class PageReader implements IPageReader {
@@ -157,53 +161,76 @@ public class PageReader implements IPageReader {
@Override
public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
// TODO we still need to consider data type, ascending and descending here
- TsBlock tsBlock = new TsBlock();
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder valueBuilder = tsBlockBuilder.getColumnBuilder(0);
if (filter == null || filter.satisfy(getStatistics())) {
- while (timeDecoder.hasNext(timeBuffer)) {
- long timestamp = timeDecoder.readLong(timeBuffer);
- switch (dataType) {
- case BOOLEAN:
+ switch (dataType) {
+ case BOOLEAN:
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
- // tsBlock.putBoolean(timestamp, aBoolean);
+ timeBuilder.writeLong(timestamp);
+ valueBuilder.writeBoolean(aBoolean);
}
- break;
- case INT32:
+ }
+ break;
+ case INT32:
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
int anInt = valueDecoder.readInt(valueBuffer);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
- // tsBlock.putInt(timestamp, anInt);
+ timeBuilder.writeLong(timestamp);
+ valueBuilder.writeInt(anInt);
}
- break;
- case INT64:
+ }
+ break;
+ case INT64:
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
long aLong = valueDecoder.readLong(valueBuffer);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
- // tsBlock.putLong(timestamp, aLong);
+ timeBuilder.writeLong(timestamp);
+ valueBuilder.writeLong(aLong);
}
- break;
- case FLOAT:
+ }
+ break;
+ case FLOAT:
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
float aFloat = valueDecoder.readFloat(valueBuffer);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
- // tsBlock.putFloat(timestamp, aFloat);
+ timeBuilder.writeLong(timestamp);
+ valueBuilder.writeFloat(aFloat);
}
- break;
- case DOUBLE:
+ }
+ break;
+ case DOUBLE:
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
double aDouble = valueDecoder.readDouble(valueBuffer);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
- // tsBlock.putDouble(timestamp, aDouble);
+ timeBuilder.writeLong(timestamp);
+ valueBuilder.writeDouble(aDouble);
}
- break;
- case TEXT:
+ }
+ break;
+ case TEXT:
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
Binary aBinary = valueDecoder.readBinary(valueBuffer);
if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
- // tsBlock.putBinary(timestamp, aBinary);
+ timeBuilder.writeLong(timestamp);
+ valueBuilder.writeBinary(aBinary);
}
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
- }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
}
- return tsBlock;
+ return tsBlockBuilder.build();
}
@Override