You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/10/30 07:32:32 UTC
[incubator-iotdb] 01/01: fix create schema bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_chunk_buffer
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit f3e8f17efcd278fd51b8ca1d7a56e8527489069a
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Oct 30 15:04:56 2019 +0800
fix create schema bug
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 8 +-
.../iotdb/db/engine/memtable/ChunkBufferPool.java | 1 +
.../db/engine/merge/manage/MergeResource.java | 8 +-
.../engine/storagegroup/StorageGroupProcessor.java | 2 +-
.../org/apache/iotdb/db/metadata/MManager.java | 189 +++++++++++----------
.../db/qp/executor/IQueryProcessExecutor.java | 2 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 2 +-
.../iotdb/db/qp/strategy/LogicalGenerator.java | 14 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 2 +-
.../db/query/dataset/DeviceIterateDataSet.java | 2 +-
.../db/query/executor/FillEngineExecutor.java | 2 +-
.../db/query/executor/IEngineQueryRouter.java | 2 +-
.../java/org/apache/iotdb/db/query/fill/IFill.java | 2 +-
.../org/apache/iotdb/db/query/fill/LinearFill.java | 2 +-
.../iotdb/db/utils/datastructure/BinaryTVList.java | 2 +-
.../db/utils/datastructure/BooleanTVList.java | 2 +-
.../iotdb/db/utils/datastructure/DoubleTVList.java | 2 +-
.../iotdb/db/utils/datastructure/FloatTVList.java | 2 +-
.../iotdb/db/utils/datastructure/IntTVList.java | 2 +-
.../iotdb/db/utils/datastructure/LongTVList.java | 2 +-
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 4 +-
.../org/apache/iotdb/db/sql/TqlParserTest.java | 12 +-
.../org/apache/iotdb/sparkdb/Transformer.scala | 2 +-
.../apache/iotdb/spark/tsfile/Transformer.scala | 2 +-
.../tsfile/encoding/decoder/GorillaDecoder.java | 2 +-
.../file/metadata/statistics/BinaryStatistics.java | 8 +-
.../metadata/statistics/BooleanStatistics.java | 4 +-
.../file/metadata/statistics/DoubleStatistics.java | 4 +-
.../file/metadata/statistics/FloatStatistics.java | 4 +-
.../metadata/statistics/IntegerStatistics.java | 4 +-
.../file/metadata/statistics/LongStatistics.java | 4 +-
.../file/metadata/statistics/NoStatistics.java | 4 +-
.../file/metadata/statistics/Statistics.java | 14 +-
.../iotdb/tsfile/write/chunk/ChunkBuffer.java | 9 +-
.../tsfile/write/chunk/ChunkGroupWriterImpl.java | 3 +-
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 165 ++++++++----------
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 11 +-
38 files changed, 244 insertions(+), 264 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4eb5576..22ce18c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -294,7 +294,7 @@ public class IoTDBConfig {
/**
* Switch of creating schema automatically
*/
- private boolean enableAutoCreateSchema = false;
+ private boolean enableAutoCreateSchema = true;
/**
* Storage group level when creating schema automatically is enabled
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 992e2cd..e60be09 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -24,11 +24,8 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.exception.FlushRunTimeException;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.Schema;
@@ -40,7 +37,6 @@ import org.slf4j.LoggerFactory;
public class MemTableFlushTask {
private static final Logger logger = LoggerFactory.getLogger(MemTableFlushTask.class);
- private static final int PAGE_SIZE_THRESHOLD = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
.getInstance();
private Future ioTaskFuture;
@@ -167,9 +163,7 @@ public class MemTableFlushTask {
} else {
long starTime = System.currentTimeMillis();
Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = ChunkBufferPool.getInstance()
- .getEmptyChunkBuffer(this, encodingMessage.right);
- IChunkWriter seriesWriter = new ChunkWriterImpl(chunkBuffer, PAGE_SIZE_THRESHOLD);
+ IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
ioTaskQueue.add(seriesWriter);
memSerializeTime += System.currentTimeMillis() - starTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
index 49c5dd5..228f918 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
* high-cost GC. In new design, we try to reuse ChunkBuffer objects by ChunkBufferPool, referring to
* {@linkplain MemTablePool}.
*/
+@Deprecated
public class ChunkBufferPool {
private static final Logger logger = LoggerFactory.getLogger(ChunkBufferPool.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index f418cb6..f4d91c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -169,22 +169,16 @@ public class MergeResource {
/**
* Construct the a new or get an existing ChunkWriter of a measurement. Different timeseries of
* the same measurement shares the same instance.
- * @param measurementSchema
- * @return
*/
public IChunkWriter getChunkWriter(MeasurementSchema measurementSchema) {
- return chunkWriterCache.computeIfAbsent(measurementSchema,
- schema -> new ChunkWriterImpl(new ChunkBuffer(schema),
- TSFileDescriptor.getInstance().getConfig().getPageCheckSizeThreshold()));
+ return chunkWriterCache.computeIfAbsent(measurementSchema, ChunkWriterImpl::new);
}
/**
* Get the modifications of a timeseries in the ModificationFile of a TsFile. Once the
* modifications of the timeseries are found out, they will be removed from the list to boost
* the next query, so two calls of the same file and timeseries are forbidden.
- * @param tsFileResource
* @param path name of the time series
- * @return
*/
public List<Modification> getModifications(TsFileResource tsFileResource, Path path) {
// copy from TsFileResource so queries are not affected
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e85f2fc..a4cad5a 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -788,7 +788,7 @@ public class StorageGroupProcessor {
/**
* @param tsFileResources includes sealed and unsealed tsfile resources
- * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk
+ * @return deserialize unsealed tsfile resources with memory data and ChunkMetadataList of data in disk
*/
private List<TsFileResource> getFileReSourceListForQuery(List<TsFileResource> tsFileResources,
String deviceId, String measurementId, QueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index e6d255c..739cd10 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -282,7 +282,7 @@ public class MManager {
}
/**
- * <p> Add one timeseries to metadata.
+ * <p> Add one timeseries to metadata tree. Do nothing if already exists.
*
* @param path the timeseries seriesPath
* @param dataType the dateType {@code DataType} for the timeseries
@@ -292,107 +292,111 @@ public class MManager {
* measurement should be registered to the StorageEngine too)
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- public boolean addPathToMTree(Path path, TSDataType dataType, TSEncoding encoding,
+ public synchronized boolean addPathToMTree(Path path, TSDataType dataType, TSEncoding encoding,
CompressionType compressor, Map<String, String> props)
throws MetadataErrorException, PathErrorException {
- if (pathExist(path.getFullPath())) {
- throw new MetadataErrorException(
- String.format("Timeseries %s already exist", path.getFullPath()));
- }
- IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
- if (!checkStorageGroupByPath(path.getFullPath())) {
- if (!conf.isAutoCreateSchemaEnabled()) {
- throw new MetadataErrorException("Storage group should be created first");
- }
- String storageGroupName = getStorageGroupNameByAutoLevel(
- path.getFullPath(), conf.getDefaultStorageGroupLevel());
- setStorageGroupToMTree(storageGroupName);
- }
- // optimize the speed of adding timeseries
- String fileNodePath;
- try {
- fileNodePath = getStorageGroupNameByPath(path.getFullPath());
- } catch (StorageGroupException e) {
- throw new MetadataErrorException(e);
- }
+ lock.writeLock().lock();
try {
- IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
- } catch (ConfigAdjusterException e) {
- throw new MetadataErrorException(e);
- }
- // the two map is stored in the storage group node
- Map<String, MeasurementSchema> schemaMap = getStorageGroupSchemaMap(fileNodePath);
- Map<String, Integer> numSchemaMap = getStorageGroupNumSchemaMap(fileNodePath);
- String lastNode = path.getMeasurement();
- boolean isNewMeasurement = true;
- // Thread safety: just one thread can access/modify the schemaMap
- synchronized (schemaMap) {
- // Need to check the path again to avoid duplicated inserting by multi concurrent threads
+ // double check
if (pathExist(path.getFullPath())) {
- throw new MetadataErrorException(
- String.format("Timeseries %s already exist", path.getFullPath()));
+ logger.debug("Timeseries {} already exists", path.getFullPath());
+ return true;
}
- if (schemaMap.containsKey(lastNode)) {
- isNewMeasurement = false;
- MeasurementSchema columnSchema = schemaMap.get(lastNode);
- if (!columnSchema.getType().equals(dataType)
- || !columnSchema.getEncodingType().equals(encoding)) {
- throw new MetadataErrorException(String.format(
- "The resultDataType or encoding of the last node %s is conflicting "
- + "in the storage group %s", lastNode, fileNodePath));
+ IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ if (!checkStorageGroupByPath(path.getFullPath())) {
+ if (!conf.isAutoCreateSchemaEnabled()) {
+ throw new MetadataErrorException("Storage group should be created first");
}
- try {
- addPathToMTreeInternal(path.getFullPath(), dataType, encoding, compressor, props);
- } catch (IOException | PathErrorException | StorageGroupException e) {
- throw new MetadataErrorException(e);
- }
- numSchemaMap.put(lastNode, numSchemaMap.get(lastNode) + 1);
- } else {
- try {
- addPathToMTreeInternal(path.getFullPath(), dataType, encoding, compressor, props);
- } catch (PathErrorException | IOException | StorageGroupException e) {
- throw new MetadataErrorException(e);
+ String storageGroupName = getStorageGroupNameByAutoLevel(
+ path.getFullPath(), conf.getDefaultStorageGroupLevel());
+ setStorageGroupToMTree(storageGroupName);
+ }
+ // optimize the speed of adding timeseries
+ String fileNodePath;
+ try {
+ fileNodePath = getStorageGroupNameByPath(path.getFullPath());
+ } catch (StorageGroupException e) {
+ throw new MetadataErrorException(e);
+ }
+ try {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
+ } catch (ConfigAdjusterException e) {
+ throw new MetadataErrorException(e);
+ }
+ // the two map is stored in the storage group node
+ Map<String, MeasurementSchema> schemaMap = getStorageGroupSchemaMap(fileNodePath);
+ Map<String, Integer> numSchemaMap = getStorageGroupNumSchemaMap(fileNodePath);
+ String lastNode = path.getMeasurement();
+ boolean isNewMeasurement = true;
+ // Thread safety: just one thread can access/modify the schemaMap
+ synchronized (schemaMap) {
+ // Need to check the path again to avoid duplicated inserting by multi concurrent threads
+ if (pathExist(path.getFullPath())) {
+ throw new MetadataErrorException(
+ String.format("Timeseries %s already exist", path.getFullPath()));
}
- MeasurementSchema columnSchema;
- try {
- columnSchema = getSchemaForOnePath(path.toString());
- } catch (PathErrorException e) {
- throw new MetadataErrorException(e);
+ if (schemaMap.containsKey(lastNode)) {
+ isNewMeasurement = false;
+ MeasurementSchema columnSchema = schemaMap.get(lastNode);
+ if (!columnSchema.getType().equals(dataType)
+ || !columnSchema.getEncodingType().equals(encoding)) {
+ throw new MetadataErrorException(String.format(
+ "The resultDataType or encoding of the last node %s is conflicting "
+ + "in the storage group %s", lastNode, fileNodePath));
+ }
+ try {
+ addPathToMTreeInternal(path.getFullPath(), dataType, encoding, compressor, props);
+ } catch (IOException | PathErrorException | StorageGroupException e) {
+ throw new MetadataErrorException(e);
+ }
+ numSchemaMap.put(lastNode, numSchemaMap.get(lastNode) + 1);
+ } else {
+ try {
+ addPathToMTreeInternal(path.getFullPath(), dataType, encoding, compressor, props);
+ } catch (PathErrorException | IOException | StorageGroupException e) {
+ throw new MetadataErrorException(e);
+ }
+ MeasurementSchema columnSchema;
+ try {
+ columnSchema = getSchemaForOnePath(path.toString());
+ } catch (PathErrorException e) {
+ throw new MetadataErrorException(e);
+ }
+ schemaMap.put(lastNode, columnSchema);
+ numSchemaMap.put(lastNode, 1);
}
- schemaMap.put(lastNode, columnSchema);
- numSchemaMap.put(lastNode, 1);
+ return isNewMeasurement;
}
- return isNewMeasurement;
+ } finally {
+ lock.writeLock().unlock();
}
}
+ /**
+ * path will be added to mgraph with no check
+ */
private void addPathToMTreeInternal(String path, TSDataType dataType, TSEncoding encoding,
CompressionType compressor, Map<String, String> props)
throws PathErrorException, IOException, StorageGroupException {
- lock.writeLock().lock();
- try {
- mgraph.addPathToMTree(path, dataType, encoding, compressor, props);
- String storageGroupName = mgraph.getStorageGroupNameByPath(path);
- int size = seriesNumberInStorageGroups.get(storageGroupName);
- seriesNumberInStorageGroups.put(storageGroupName, size + 1);
- if (size + 1 > maxSeriesNumberAmongStorageGroup) {
- maxSeriesNumberAmongStorageGroup = size + 1;
- }
- if (writeToLog) {
- BufferedWriter writer = getLogWriter();
- writer.write(String.format("%s,%s,%s,%s,%s", MetadataOperationType.ADD_PATH_TO_MTREE,
- path, dataType.serialize(), encoding.serialize(), compressor.serialize()));
- if (props != null) {
- for (Map.Entry entry : props.entrySet()) {
- writer.write(String.format(",%s=%s", entry.getKey(), entry.getValue()));
- }
+ mgraph.addPathToMTree(path, dataType, encoding, compressor, props);
+ String storageGroupName = mgraph.getStorageGroupNameByPath(path);
+ int size = seriesNumberInStorageGroups.get(storageGroupName);
+ seriesNumberInStorageGroups.put(storageGroupName, size + 1);
+ if (size + 1 > maxSeriesNumberAmongStorageGroup) {
+ maxSeriesNumberAmongStorageGroup = size + 1;
+ }
+ if (writeToLog) {
+ BufferedWriter writer = getLogWriter();
+ writer.write(String.format("%s,%s,%s,%s,%s", MetadataOperationType.ADD_PATH_TO_MTREE,
+ path, dataType.serialize(), encoding.serialize(), compressor.serialize()));
+ if (props != null) {
+ for (Map.Entry entry : props.entrySet()) {
+ writer.write(String.format(",%s=%s", entry.getKey(), entry.getValue()));
}
- writer.newLine();
- writer.flush();
}
- } finally {
- lock.writeLock().unlock();
+ writer.newLine();
+ writer.flush();
}
}
@@ -408,10 +412,16 @@ public class MManager {
*/
public void addPathToMTree(String path, String dataType, String encoding)
throws PathErrorException, IOException, StorageGroupException {
- TSDataType tsDataType = TSDataType.valueOf(dataType);
- TSEncoding tsEncoding = TSEncoding.valueOf(encoding);
- CompressionType type = CompressionType.valueOf(TSFileDescriptor.getInstance().getConfig().getCompressor());
- addPathToMTreeInternal(path, tsDataType, tsEncoding, type, Collections.emptyMap());
+ lock.writeLock().lock();
+ try {
+ TSDataType tsDataType = TSDataType.valueOf(dataType);
+ TSEncoding tsEncoding = TSEncoding.valueOf(encoding);
+ CompressionType type = CompressionType
+ .valueOf(TSFileDescriptor.getInstance().getConfig().getCompressor());
+ addPathToMTreeInternal(path, tsDataType, tsEncoding, type, Collections.emptyMap());
+ } finally {
+ lock.writeLock().unlock();
+ }
}
/**
@@ -571,6 +581,9 @@ public class MManager {
public void setStorageGroupToMTree(String path) throws MetadataErrorException {
lock.writeLock().lock();
try {
+ if (mgraph.checkStorageGroup(path)) {
+ return;
+ }
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
mgraph.setStorageGroup(path);
seriesNumberInStorageGroups.put(path, 0);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index f8f4de2..f96f178 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -73,7 +73,7 @@ public interface IQueryProcessExecutor {
throws ProcessorException, IOException, PathErrorException, StorageEngineException, QueryFilterOptimizationException;
/**
- * process fill plan of qp layer, construct queryDataSet.
+ * process deserialize plan of qp layer, construct queryDataSet.
*/
QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
QueryContext context)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 7816f8f..024b8f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -231,7 +231,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
@Override
- public boolean insert(InsertPlan insertPlan) throws ProcessorException {
+ public synchronized boolean insert(InsertPlan insertPlan) throws ProcessorException {
try {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index b3c8ae6..ab86d5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -811,7 +811,7 @@ public class LogicalGenerator {
}
/**
- * analyze fill type clause.
+ * analyze deserialize type clause.
*
* <P>PreviousClause : PREVIOUS COMMA < ValidPreviousTime > LinearClause : LINEAR COMMA <
* ValidPreviousTime > COMMA < ValidBehindTime >
@@ -819,7 +819,7 @@ public class LogicalGenerator {
private void analyzeFill(AstNode node) throws LogicalOperatorException {
FilterOperator filterOperator = ((SFWOperator) initializedOperator).getFilterOperator();
if (!filterOperator.isLeaf() || filterOperator.getTokenIntType() != SQLConstant.EQUAL) {
- throw new LogicalOperatorException("Only \"=\" can be used in fill function");
+ throw new LogicalOperatorException("Only \"=\" can be used in deserialize function");
}
Map<TSDataType, IFill> fillTypes = new EnumMap<>(TSDataType.class);
@@ -839,7 +839,7 @@ public class LogicalGenerator {
fillTypes.put(dataType, new LinearFill(-1, -1));
} else {
throw new LogicalOperatorException(
- "Linear fill type must have 0 or 2 valid time ranges");
+ "Linear deserialize type must have 0 or 2 valid time ranges");
}
break;
case TOK_PREVIOUS:
@@ -851,7 +851,7 @@ public class LogicalGenerator {
fillTypes.put(dataType, new PreviousFill(-1));
} else {
throw new LogicalOperatorException(
- "Previous fill type must have 0 or 1 valid time range");
+ "Previous deserialize type must have 0 or 1 valid time range");
}
break;
default:
@@ -871,7 +871,7 @@ public class LogicalGenerator {
case DOUBLE:
if (type != TOK_LINEAR && type != TOK_PREVIOUS) {
throw new LogicalOperatorException(
- String.format("type %s cannot use %s fill function", dataType,
+ String.format("type %s cannot use %s deserialize function", dataType,
TqlParser.tokenNames[type]));
}
return;
@@ -879,7 +879,7 @@ public class LogicalGenerator {
case TEXT:
if (type != TOK_PREVIOUS) {
throw new LogicalOperatorException(
- String.format("type %s cannot use %s fill function", dataType,
+ String.format("type %s cannot use %s deserialize function", dataType,
TqlParser.tokenNames[type]));
}
return;
@@ -907,7 +907,7 @@ public class LogicalGenerator {
case "text":
return TSDataType.TEXT;
default:
- throw new LogicalOperatorException("not a valid fill type : " + type);
+ throw new LogicalOperatorException("not a valid deserialize type : " + type);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 7373956..8ba9613 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -254,7 +254,7 @@ public class PhysicalGenerator {
// multiQueryPlan.setOrigin(queryOperator.getOrigin());
// multiQueryPlan.setIntervals(queryOperator.getIntervals());
// return multiQueryPlan;
- // } else if (queryOperator.isFill()) { // old fill query
+ // } else if (queryOperator.isFill()) { // old deserialize query
// multiQueryPlan.setType(MultiQueryPlan.QueryType.FILL);
// FilterOperator timeFilter = queryOperator.getFilterOperator();
// if (!timeFilter.isSingle())
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java
index 139d214..a5a779a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java
@@ -63,7 +63,7 @@ public class DeviceIterateDataSet extends QueryDataSet {
private long origin;
private List<Pair<Long, Long>> intervals;
- // fill parameters
+ // deserialize parameters
private long queryTime;
private Map<TSDataType, IFill> fillType;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
index c4a3564..8e90ab9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
@@ -52,7 +52,7 @@ public class FillEngineExecutor {
}
/**
- * execute fill.
+ * execute deserialize.
*
* @param context query context
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
index eea20c9..a41ba2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
@@ -66,7 +66,7 @@ public interface IEngineQueryRouter {
throws ProcessorException, QueryFilterOptimizationException, StorageEngineException,
PathErrorException, IOException;
/**
- * Execute fill query.
+ * Execute deserialize query.
*
* @param fillPaths select path list
* @param queryTime timestamp
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index 4cf7eb0..342679a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -71,7 +71,7 @@ public abstract class IFill {
}
private Filter constructFilter(long beforeRange) {
- // if the fill time range is not set, beforeRange will be set to -1.
+ // if the deserialize time range is not set, beforeRange will be set to -1.
if (beforeRange == -1) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
index bc5efff..97d9009 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
@@ -142,7 +142,7 @@ public class LinearFill extends IFill {
beforePair.setValue(TsPrimitiveType.getByType(TSDataType.DOUBLE, fillDoubleValue));
break;
default:
- throw new UnSupportedFillTypeException("Unsupported linear fill data type : " + dataType);
+ throw new UnSupportedFillTypeException("Unsupported linear deserialize data type : " + dataType);
}
beforePair.setTimestamp(queryTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 56d247d..2ec8163 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -194,7 +194,7 @@ public class BinaryTVList extends TVList {
size += inputRemaining;
break;
} else {
- // the remaining inputs cannot fit the last array, fill the last array and create a new
+ // the remaining inputs cannot fit the last array, deserialize the last array and create a new
// one and enter the next loop
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 1f2d264..adc0be9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -193,7 +193,7 @@ public class BooleanTVList extends TVList {
size += inputRemaining;
break;
} else {
- // the remaining inputs cannot fit the last array, fill the last array and create a new
+ // the remaining inputs cannot fit the last array, deserialize the last array and create a new
// one and enter the next loop
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 4efd2b4..1696d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -193,7 +193,7 @@ public class DoubleTVList extends TVList {
size += inputRemaining;
break;
} else {
- // the remaining inputs cannot fit the last array, fill the last array and create a new
+ // the remaining inputs cannot fit the last array, deserialize the last array and create a new
// one and enter the next loop
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index ce33e1f..c558471 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -193,7 +193,7 @@ public class FloatTVList extends TVList {
size += inputRemaining;
break;
} else {
- // the remaining inputs cannot fit the last array, fill the last array and create a new
+ // the remaining inputs cannot fit the last array, deserialize the last array and create a new
// one and enter the next loop
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 8726a9e..1df61fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -193,7 +193,7 @@ public class IntTVList extends TVList {
size += inputRemaining;
break;
} else {
- // the remaining inputs cannot fit the last array, fill the last array and create a new
+ // the remaining inputs cannot fit the last array, deserialize the last array and create a new
// one and enter the next loop
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 6699ef4..df3bee6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -193,7 +193,7 @@ public class LongTVList extends TVList {
size += inputRemaining;
break;
} else {
- // the remaining inputs cannot fit the last array, fill the last array and create a new
+ // the remaining inputs cannot fit the last array, deserialize the last array and create a new
// one and enter the next loop
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
index 09ad27a..599b71e 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
@@ -128,7 +128,7 @@ public class PhysicalPlanTest {
plan.toString());
}
- // TODO uncomment these code when implement aggregation and fill function
+ // TODO uncomment these code when implement aggregation and deserialize function
@Test
public void testAggregation()
@@ -222,7 +222,7 @@ public class PhysicalPlanTest {
try {
PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
} catch (Exception e) {
- assertEquals("Only \"=\" can be used in fill function", e.getMessage().toString());
+ assertEquals("Only \"=\" can be used in deserialize function", e.getMessage().toString());
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sql/TqlParserTest.java b/server/src/test/java/org/apache/iotdb/db/sql/TqlParserTest.java
index e5eb955..c82d13e 100644
--- a/server/src/test/java/org/apache/iotdb/db/sql/TqlParserTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sql/TqlParserTest.java
@@ -1105,7 +1105,7 @@ public class TqlParserTest {
"TOK_TYPE", "float", "TOK_PREVIOUS", "TOK_DURATION", "11h"));
ArrayList<String> rec = new ArrayList<>();
AstNode astTree = ParseGenerator.generateAST(
- "select s1 " + "FROM root.vehicle.d1 " + "WHERE time = 1234567 fill(float[previous, 11h])");
+ "select s1 " + "FROM root.vehicle.d1 " + "WHERE time = 1234567 deserialize(float[previous, 11h])");
astTree = ParseUtils.findRootNonNullToken(astTree);
recursivePrintSon(astTree, rec);
@@ -1127,7 +1127,7 @@ public class TqlParserTest {
"TOK_TYPE", "float", "TOK_PREVIOUS"));
ArrayList<String> rec = new ArrayList<>();
AstNode astTree = ParseGenerator
- .generateAST("select s1 FROM root.vehicle.d1 WHERE time = 1234567 fill(float[previous])");
+ .generateAST("select s1 FROM root.vehicle.d1 WHERE time = 1234567 deserialize(float[previous])");
astTree = ParseUtils.findRootNonNullToken(astTree);
recursivePrintSon(astTree, rec);
@@ -1150,7 +1150,7 @@ public class TqlParserTest {
"123d"));
ArrayList<String> rec = new ArrayList<>();
AstNode astTree = ParseGenerator.generateAST(
- "select s1 FROM root.vehicle.d1 WHERE time = 1234567 fill(boolean[linear, 31s, 123d])");
+ "select s1 FROM root.vehicle.d1 WHERE time = 1234567 deserialize(boolean[linear, 31s, 123d])");
astTree = ParseUtils.findRootNonNullToken(astTree);
recursivePrintSon(astTree, rec);
@@ -1172,7 +1172,7 @@ public class TqlParserTest {
"TOK_TYPE", "boolean", "TOK_LINEAR"));
ArrayList<String> rec = new ArrayList<>();
AstNode astTree = ParseGenerator
- .generateAST("select s1 FROM root.vehicle.d1 WHERE time = 1234567 fill(boolean[linear])");
+ .generateAST("select s1 FROM root.vehicle.d1 WHERE time = 1234567 deserialize(boolean[linear])");
astTree = ParseUtils.findRootNonNullToken(astTree);
recursivePrintSon(astTree, rec);
@@ -1197,7 +1197,7 @@ public class TqlParserTest {
"TOK_PREVIOUS", "TOK_TIMEUNIT", "66", "w"));
ArrayList<String> rec = new ArrayList<>();
AstNode astTree = ParseGenerator.generateAST(
- "select s1,s2 FROM root.vehicle.d1 WHERE time = 1234567 fill(int[linear, 5ms, 7d], float[previous], boolean[linear], text[previous, 66w])");
+ "select s1,s2 FROM root.vehicle.d1 WHERE time = 1234567 deserialize(int[linear, 5ms, 7d], float[previous], boolean[linear], text[previous, 66w])");
astTree = ParseUtils.findRootNonNullToken(astTree);
recursivePrintSon(astTree, rec);
@@ -1458,7 +1458,7 @@ public class TqlParserTest {
"50"));
ArrayList<String> rec = new ArrayList<>();
AstNode astTree = ParseGenerator.generateAST(
- "select * " + "FROM root.vehicle.d1 " + "WHERE time = 1234567 fill(float[previous, 11h])"
+ "select * " + "FROM root.vehicle.d1 " + "WHERE time = 1234567 deserialize(float[previous, 11h])"
+ "slimit 15 " + "soffset 50");
astTree = ParseUtils.findRootNonNullToken(astTree);
recursivePrintSon(astTree, rec);
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/sparkdb/Transformer.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/sparkdb/Transformer.scala
index ae47fed..36e1414 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/sparkdb/Transformer.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/sparkdb/Transformer.scala
@@ -86,7 +86,7 @@ object Transformer {
query += ", `" + device_name + "." + m + "` as " + m
}
else {
- // fill null column
+ // deserialize null column
query += ", NULL as " + m
}
}
diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala
index eeff973..7d8418c 100644
--- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala
+++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala
@@ -87,7 +87,7 @@ object Transformer {
query += ", `" + device_name + "." + m + "` as " + m
}
else {
- // fill null column
+ // deserialize null column
query += ", NULL as " + m
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/GorillaDecoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/GorillaDecoder.java
index d67bc2f..85fbb0e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/GorillaDecoder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/GorillaDecoder.java
@@ -87,7 +87,7 @@ public abstract class GorillaDecoder extends Decoder {
this.buffer = ReadWriteIOUtils.read(buffer);
numberLeftInBuffer = 8;
} else {
- logger.error("Failed to fill a new buffer, because there is no byte to read");
+ logger.error("Failed to deserialize a new buffer, because there is no byte to read");
this.buffer = EOF;
numberLeftInBuffer = -1;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index c541d98..64dc577 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -195,7 +195,7 @@ public class BinaryStatistics extends Statistics<Binary> {
}
@Override
- void fill(InputStream inputStream) throws IOException {
+ void deserialize(InputStream inputStream) throws IOException {
this.min = new Binary(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream));
this.max = new Binary(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream));
this.first = new Binary(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream));
@@ -204,7 +204,7 @@ public class BinaryStatistics extends Statistics<Binary> {
}
@Override
- void fill(ByteBuffer byteBuffer) throws IOException {
+ void deserialize(ByteBuffer byteBuffer) throws IOException {
this.min = new Binary(
ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(byteBuffer).array());
this.max = new Binary(
@@ -217,11 +217,11 @@ public class BinaryStatistics extends Statistics<Binary> {
}
@Override
- protected void fill(TsFileInput input, long offset) throws IOException {
+ protected void deserialize(TsFileInput input, long offset) throws IOException {
int size = getSerializedSize();
ByteBuffer buffer = ByteBuffer.allocate(size);
ReadWriteIOUtils.readAsPossible(input, offset, buffer);
buffer.flip();
- fill(buffer);
+ deserialize(buffer);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
index 2e5fda7..097dba0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
@@ -193,7 +193,7 @@ public class BooleanStatistics extends Statistics<Boolean> {
}
@Override
- void fill(InputStream inputStream) throws IOException {
+ void deserialize(InputStream inputStream) throws IOException {
this.min = ReadWriteIOUtils.readBool(inputStream);
this.max = ReadWriteIOUtils.readBool(inputStream);
this.first = ReadWriteIOUtils.readBool(inputStream);
@@ -202,7 +202,7 @@ public class BooleanStatistics extends Statistics<Boolean> {
}
@Override
- void fill(ByteBuffer byteBuffer) throws IOException {
+ void deserialize(ByteBuffer byteBuffer) throws IOException {
this.min = ReadWriteIOUtils.readBool(byteBuffer);
this.max = ReadWriteIOUtils.readBool(byteBuffer);
this.first = ReadWriteIOUtils.readBool(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
index 296f4d0..250aaac 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
@@ -193,7 +193,7 @@ public class DoubleStatistics extends Statistics<Double> {
}
@Override
- void fill(InputStream inputStream) throws IOException {
+ void deserialize(InputStream inputStream) throws IOException {
this.min = ReadWriteIOUtils.readDouble(inputStream);
this.max = ReadWriteIOUtils.readDouble(inputStream);
this.first = ReadWriteIOUtils.readDouble(inputStream);
@@ -202,7 +202,7 @@ public class DoubleStatistics extends Statistics<Double> {
}
@Override
- void fill(ByteBuffer byteBuffer) throws IOException {
+ void deserialize(ByteBuffer byteBuffer) throws IOException {
this.min = ReadWriteIOUtils.readDouble(byteBuffer);
this.max = ReadWriteIOUtils.readDouble(byteBuffer);
this.first = ReadWriteIOUtils.readDouble(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
index 33e1d6d..e1f487b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
@@ -184,7 +184,7 @@ public class FloatStatistics extends Statistics<Float> {
}
@Override
- void fill(InputStream inputStream) throws IOException {
+ void deserialize(InputStream inputStream) throws IOException {
this.min = ReadWriteIOUtils.readFloat(inputStream);
this.max = ReadWriteIOUtils.readFloat(inputStream);
this.first = ReadWriteIOUtils.readFloat(inputStream);
@@ -193,7 +193,7 @@ public class FloatStatistics extends Statistics<Float> {
}
@Override
- void fill(ByteBuffer byteBuffer) throws IOException {
+ void deserialize(ByteBuffer byteBuffer) throws IOException {
this.min = ReadWriteIOUtils.readFloat(byteBuffer);
this.max = ReadWriteIOUtils.readFloat(byteBuffer);
this.first = ReadWriteIOUtils.readFloat(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
index aa10c02..f2ab8cf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
@@ -187,7 +187,7 @@ public class IntegerStatistics extends Statistics<Integer> {
}
@Override
- void fill(InputStream inputStream) throws IOException {
+ void deserialize(InputStream inputStream) throws IOException {
this.min = ReadWriteIOUtils.readInt(inputStream);
this.max = ReadWriteIOUtils.readInt(inputStream);
this.first = ReadWriteIOUtils.readInt(inputStream);
@@ -196,7 +196,7 @@ public class IntegerStatistics extends Statistics<Integer> {
}
@Override
- void fill(ByteBuffer byteBuffer) throws IOException {
+ void deserialize(ByteBuffer byteBuffer) throws IOException {
this.min = ReadWriteIOUtils.readInt(byteBuffer);
this.max = ReadWriteIOUtils.readInt(byteBuffer);
this.first = ReadWriteIOUtils.readInt(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
index af37fa4..0e4ace5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
@@ -194,7 +194,7 @@ public class LongStatistics extends Statistics<Long> {
}
@Override
- void fill(InputStream inputStream) throws IOException {
+ void deserialize(InputStream inputStream) throws IOException {
this.min = ReadWriteIOUtils.readLong(inputStream);
this.max = ReadWriteIOUtils.readLong(inputStream);
this.first = ReadWriteIOUtils.readLong(inputStream);
@@ -203,7 +203,7 @@ public class LongStatistics extends Statistics<Long> {
}
@Override
- void fill(ByteBuffer byteBuffer) throws IOException {
+ void deserialize(ByteBuffer byteBuffer) throws IOException {
this.min = ReadWriteIOUtils.readLong(byteBuffer);
this.max = ReadWriteIOUtils.readLong(byteBuffer);
this.first = ReadWriteIOUtils.readLong(byteBuffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/NoStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/NoStatistics.java
index 812f679..9f66ba4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/NoStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/NoStatistics.java
@@ -170,12 +170,12 @@ public class NoStatistics extends Statistics<Long> {
}
@Override
- void fill(InputStream inputStream) {
+ void deserialize(InputStream inputStream) {
// NoStatistics does not make any statistics
}
@Override
- void fill(ByteBuffer byteBuffer) {
+ void deserialize(ByteBuffer byteBuffer) {
// NoStatistics does not make any statistics
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 8ef6f72..2e20555 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -75,14 +75,14 @@ public abstract class Statistics<T> {
public static Statistics deserialize(InputStream inputStream, TSDataType dataType)
throws IOException {
Statistics statistics = getStatsByType(dataType);
- statistics.fill(inputStream);
+ statistics.deserialize(inputStream);
statistics.isEmpty = false;
return statistics;
}
public static Statistics deserialize(ByteBuffer buffer, TSDataType dataType) throws IOException {
Statistics statistics = getStatsByType(dataType);
- statistics.fill(buffer);
+ statistics.deserialize(buffer);
statistics.isEmpty = false;
return statistics;
}
@@ -90,7 +90,7 @@ public abstract class Statistics<T> {
public static Statistics deserialize(TsFileInput input, long offset, TSDataType dataType)
throws IOException {
Statistics statistics = getStatsByType(dataType);
- statistics.fill(input, offset);
+ statistics.deserialize(input, offset);
statistics.isEmpty = false;
return statistics;
}
@@ -242,16 +242,16 @@ public abstract class Statistics<T> {
/**
* read data from the inputStream.
*/
- abstract void fill(InputStream inputStream) throws IOException;
+ abstract void deserialize(InputStream inputStream) throws IOException;
- abstract void fill(ByteBuffer byteBuffer) throws IOException;
+ abstract void deserialize(ByteBuffer byteBuffer) throws IOException;
- protected void fill(TsFileInput input, long offset) throws IOException {
+ protected void deserialize(TsFileInput input, long offset) throws IOException {
int size = getSerializedSize();
ByteBuffer buffer = ByteBuffer.allocate(size);
ReadWriteIOUtils.readAsPossible(input, offset, buffer);
buffer.flip();
- fill(buffer);
+ deserialize(buffer);
}
public int getSerializedSize() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 4345505..27c58a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -69,23 +69,18 @@ public class ChunkBuffer {
return numOfPages;
}
- public void setNumOfPages(int numOfPages) {
- this.numOfPages = numOfPages;
- }
-
/**
* write the page header and data into the PageWriter's output stream.
*
* @param data the data of the page
* @param valueCount - the amount of values in that page
- * @param statistics - the statistics for that page
+ * @param statistics - page statistics
* @param maxTimestamp - timestamp maximum in given data
* @param minTimestamp - timestamp minimum in given data
* @return byte size of the page header and uncompressed data in the page body.
*/
public int writePageHeaderAndDataIntoBuff(ByteBuffer data, int valueCount,
- Statistics<?> statistics,
- long maxTimestamp, long minTimestamp) throws PageException {
+ Statistics<?> statistics, long maxTimestamp, long minTimestamp) throws PageException {
numOfPages++;
// 1. update time statistics
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
index e6422a3..a15e3e5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
@@ -57,8 +57,7 @@ public class ChunkGroupWriterImpl implements IChunkGroupWriter {
@Override
public void tryToAddSeriesWriter(MeasurementSchema schema, int pageSizeThreshold) {
if (!chunkWriters.containsKey(schema.getMeasurementId())) {
- ChunkBuffer chunkBuffer = new ChunkBuffer(schema);
- IChunkWriter seriesWriter = new ChunkWriterImpl(chunkBuffer, pageSizeThreshold);
+ IChunkWriter seriesWriter = new ChunkWriterImpl(schema);
this.chunkWriters.put(schema.getMeasurementId(), seriesWriter);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 295f4fb..d5bad0a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.tsfile.write.chunk;
import java.io.IOException;
import java.math.BigDecimal;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -43,81 +42,75 @@ public class ChunkWriterImpl implements IChunkWriter {
private static final Logger LOG = LoggerFactory.getLogger(ChunkWriterImpl.class);
- // initial value for this.valueCountInOnePageForNextCheck
- private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+ private MeasurementSchema measurementSchema;
- private final TSDataType dataType;
/**
* help to encode data of this series.
*/
private final ChunkBuffer chunkBuffer;
+
/**
- * page size threshold.
+ * value writer to encode data.
*/
- private final long psThres;
- private final int pageCountUpperBound;
+ private PageWriter pageWriter;
+
/**
- * value writer to encode data.
+ * page size threshold.
*/
- private PageWriter dataPageWriter;
+ private final long pageSizeThreshold;
+
+ private final int maxNumberOfPointsInPage;
+
+ // initial value for this.valueCountInOnePageForNextCheck
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
/**
* value count in a page. It will be reset after calling {@code writePageHeaderAndDataIntoBuff()}
*/
private int valueCountInOnePage;
private int valueCountInOnePageForNextCheck;
- /**
- * statistic on a page. It will be reset after calling {@code writePageHeaderAndDataIntoBuff()}
- */
- private Statistics<?> pageStatistics;
+
/**
* statistic on a stage. It will be reset after calling {@code writeAllPagesOfSeriesToTsFile()}
*/
private Statistics<?> chunkStatistics;
- // time of the latest written time value pair
- private long time;
- private long minTimestamp = Long.MIN_VALUE;
- private MeasurementSchema measurementSchema;
+ /**
+ * statistic on a page. It will be reset after calling {@code writePageHeaderAndDataIntoBuff()}
+ */
+ private Statistics<?> pageStatistics;
+
+ // time of the latest written time value pair, we assume data is written in time order
+ private long maxTimestamp;
+ private long minTimestamp = Long.MIN_VALUE;
/**
- * constructor of ChunkWriterImpl.
- *
- * @param chunkBuffer chunk in buffer
- * @param pageSizeThreshold page size threshold
+ * @param schema schema of this measurement
*/
- public ChunkWriterImpl(ChunkBuffer chunkBuffer, int pageSizeThreshold) {
- this.measurementSchema = chunkBuffer.getSchema();
- this.dataType = measurementSchema.getType();
- this.chunkBuffer = chunkBuffer;
- this.psThres = pageSizeThreshold;
+ public ChunkWriterImpl(MeasurementSchema schema) {
+ this.measurementSchema = schema;
+ this.chunkBuffer = new ChunkBuffer(measurementSchema);
+ this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxNumberOfPointsInPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
// initial check of memory usage. So that we have enough data to make an initial prediction
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
// init statistics for this series and page
- this.chunkStatistics = Statistics.getStatsByType(dataType);
- resetPageStatistics();
+ this.chunkStatistics = Statistics.getStatsByType(measurementSchema.getType());
+ this.pageStatistics = Statistics.getStatsByType(measurementSchema.getType());
- this.dataPageWriter = new PageWriter();
- this.pageCountUpperBound = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
-
- this.dataPageWriter.setTimeEncoder(measurementSchema.getTimeEncoder());
- this.dataPageWriter.setValueEncoder(measurementSchema.getValueEncoder());
- }
+ this.pageWriter = new PageWriter();
- /**
- * reset statistics of page by dataType of this measurement.
- */
- private void resetPageStatistics() {
- this.pageStatistics = Statistics.getStatsByType(dataType);
+ this.pageWriter.setTimeEncoder(measurementSchema.getTimeEncoder());
+ this.pageWriter.setValueEncoder(measurementSchema.getValueEncoder());
}
@Override
public void write(long time, long value) {
- this.time = time;
+ this.maxTimestamp = time;
++valueCountInOnePage;
- dataPageWriter.write(time, value);
+ pageWriter.write(time, value);
pageStatistics.updateStats(value);
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = time;
@@ -127,9 +120,9 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void write(long time, int value) {
- this.time = time;
+ this.maxTimestamp = time;
++valueCountInOnePage;
- dataPageWriter.write(time, value);
+ pageWriter.write(time, value);
pageStatistics.updateStats(value);
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = time;
@@ -139,9 +132,9 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void write(long time, boolean value) {
- this.time = time;
+ this.maxTimestamp = time;
++valueCountInOnePage;
- dataPageWriter.write(time, value);
+ pageWriter.write(time, value);
pageStatistics.updateStats(value);
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = time;
@@ -151,9 +144,9 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void write(long time, float value) {
- this.time = time;
+ this.maxTimestamp = time;
++valueCountInOnePage;
- dataPageWriter.write(time, value);
+ pageWriter.write(time, value);
pageStatistics.updateStats(value);
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = time;
@@ -163,9 +156,9 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void write(long time, double value) {
- this.time = time;
+ this.maxTimestamp = time;
++valueCountInOnePage;
- dataPageWriter.write(time, value);
+ pageWriter.write(time, value);
pageStatistics.updateStats(value);
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = time;
@@ -175,9 +168,9 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void write(long time, BigDecimal value) {
- this.time = time;
+ this.maxTimestamp = time;
++valueCountInOnePage;
- dataPageWriter.write(time, value);
+ pageWriter.write(time, value);
pageStatistics.updateStats(value);
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = time;
@@ -187,9 +180,9 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void write(long time, Binary value) {
- this.time = time;
+ this.maxTimestamp = time;
++valueCountInOnePage;
- dataPageWriter.write(time, value);
+ pageWriter.write(time, value);
pageStatistics.updateStats(value);
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = time;
@@ -199,84 +192,84 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void write(long[] timestamps, int[] values, int batchSize) {
- this.time = timestamps[batchSize - 1];
+ this.maxTimestamp = timestamps[batchSize - 1];
valueCountInOnePage += batchSize;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values, batchSize);
+ pageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, long[] values, int batchSize) {
- this.time = timestamps[batchSize - 1];
+ this.maxTimestamp = timestamps[batchSize - 1];
valueCountInOnePage += batchSize;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values, batchSize);
+ pageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, boolean[] values, int batchSize) {
- this.time = timestamps[batchSize - 1];
+ this.maxTimestamp = timestamps[batchSize - 1];
valueCountInOnePage += batchSize;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values, batchSize);
+ pageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, float[] values, int batchSize) {
- this.time = timestamps[batchSize - 1];
+ this.maxTimestamp = timestamps[batchSize - 1];
valueCountInOnePage += batchSize;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values, batchSize);
+ pageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, double[] values, int batchSize) {
- this.time = timestamps[batchSize - 1];
+ this.maxTimestamp = timestamps[batchSize - 1];
valueCountInOnePage += batchSize;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values, batchSize);
+ pageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
- this.time = timestamps[batchSize - 1];
+ this.maxTimestamp = timestamps[batchSize - 1];
valueCountInOnePage += batchSize;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values, batchSize);
+ pageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, Binary[] values, int batchSize) {
- this.time = timestamps[batchSize - 1];
+ this.maxTimestamp = timestamps[batchSize - 1];
valueCountInOnePage += batchSize;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values, batchSize);
+ pageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@@ -286,31 +279,24 @@ public class ChunkWriterImpl implements IChunkWriter {
* OutputStream.
*/
private void checkPageSizeAndMayOpenANewPage() {
- if (valueCountInOnePage == pageCountUpperBound) {
+ if (valueCountInOnePage == maxNumberOfPointsInPage) {
LOG.debug("current line count reaches the upper bound, write page {}", measurementSchema);
writePage();
- } else if (valueCountInOnePage
- >= valueCountInOnePageForNextCheck) { // need to check memory size
+ } else if (valueCountInOnePage >= valueCountInOnePageForNextCheck) { // need to check memory size
// not checking the memory used for every value
- long currentColumnSize = dataPageWriter.estimateMaxMemSize();
- if (currentColumnSize > psThres) { // memory size exceeds threshold
+ long currentPageSize = pageWriter.estimateMaxMemSize();
+ if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
// we will write the current page
LOG.debug(
- "enough size, write page {}, psThres:{}, currentColumnSize:{}, valueCountInOnePage:{}",
- measurementSchema.getMeasurementId(), psThres, currentColumnSize, valueCountInOnePage);
+ "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+ measurementSchema.getMeasurementId(), pageSizeThreshold, currentPageSize, valueCountInOnePage);
writePage();
valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
} else {
// reset the valueCountInOnePageForNextCheck for the next page
- valueCountInOnePageForNextCheck = (int) (((float) psThres / currentColumnSize)
+ valueCountInOnePageForNextCheck = (int) (((float) pageSizeThreshold / currentPageSize)
* valueCountInOnePage);
- LOG.debug(
- "not enough size. {}, psThres:{}, currentColumnSize:{}, now valueCountInOnePage: {}, "
- + "change to {}",
- measurementSchema.getMeasurementId(), psThres, currentColumnSize, valueCountInOnePage,
- valueCountInOnePageForNextCheck);
}
-
}
}
@@ -319,14 +305,13 @@ public class ChunkWriterImpl implements IChunkWriter {
*/
private void writePage() {
try {
- chunkBuffer.writePageHeaderAndDataIntoBuff(dataPageWriter.getUncompressedBytes(),
- valueCountInOnePage,
- pageStatistics, time, minTimestamp);
+ chunkBuffer.writePageHeaderAndDataIntoBuff(pageWriter.getUncompressedBytes(),
+ valueCountInOnePage, pageStatistics, maxTimestamp, minTimestamp);
// update statistics of this series
this.chunkStatistics.mergeStatistics(this.pageStatistics);
} catch (IOException e) {
- LOG.error("meet error in dataPageWriter.getUncompressedBytes(),ignore this page:", e);
+ LOG.error("meet error in pageWriter.getUncompressedBytes(),ignore this page:", e);
} catch (PageException e) {
LOG.error(
"meet error in chunkBuffer.writePageHeaderAndDataIntoBuff, ignore this page:", e);
@@ -334,8 +319,8 @@ public class ChunkWriterImpl implements IChunkWriter {
// clear start time stamp for next initializing
minTimestamp = Long.MIN_VALUE;
valueCountInOnePage = 0;
- dataPageWriter.reset();
- resetPageStatistics();
+ pageWriter.reset();
+ this.pageStatistics = Statistics.getStatsByType(measurementSchema.getType());
}
}
@@ -345,12 +330,12 @@ public class ChunkWriterImpl implements IChunkWriter {
chunkBuffer.writeAllPagesOfSeriesToTsFile(tsfileWriter, chunkStatistics);
chunkBuffer.reset();
// reset series_statistics
- this.chunkStatistics = Statistics.getStatsByType(dataType);
+ this.chunkStatistics = Statistics.getStatsByType(measurementSchema.getType());
}
@Override
public long estimateMaxSeriesMemSize() {
- return dataPageWriter.estimateMaxMemSize() + chunkBuffer.estimateMaxPageMemSize();
+ return pageWriter.estimateMaxMemSize() + chunkBuffer.estimateMaxPageMemSize();
}
@Override
@@ -378,6 +363,6 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public TSDataType getDataType() {
- return dataType;
+ return measurementSchema.getType();
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index fffa6bd..ff00977 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -179,7 +179,7 @@ public class TsFileIOWriter {
* @param descriptor - measurement of this time series
* @param compressionCodecName - compression name of this time series
* @param tsDataType - data type
- * @param statistics - statistic of the whole series
+ * @param statistics - Chunk statistics
* @param maxTime - maximum timestamp of the whole series in this stage
* @param minTime - minimum timestamp of the whole series in this stage
* @param dataSize - the serialized size of all pages
@@ -188,16 +188,15 @@ public class TsFileIOWriter {
*/
public int startFlushChunk(MeasurementSchema descriptor, CompressionType compressionCodecName,
TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, long maxTime,
- long minTime,
- int dataSize, int numOfPages) throws IOException {
- logger.debug("start series chunk:{}, file position {}", descriptor, out.getPosition());
+ long minTime, int dataSize, int numOfPages) throws IOException {
currentChunkMetaData = new ChunkMetaData(descriptor.getMeasurementId(), tsDataType,
out.getPosition(), minTime, maxTime);
+ // flush ChunkHeader to TsFileIOWriter
+ logger.debug("start series chunk:{}, file position {}", descriptor, out.getPosition());
ChunkHeader header = new ChunkHeader(descriptor.getMeasurementId(), dataSize, tsDataType,
- compressionCodecName,
- encodingType, numOfPages);
+ compressionCodecName, encodingType, numOfPages);
header.serializeTo(out.wrapAsStream());
logger.debug("finish series chunk:{} header, file position {}", header, out.getPosition());