You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/11/21 13:01:51 UTC
[incubator-iotdb] 01/01: Merge branch 'master' into
dev_new_merge_strategy
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit b8f7c1bf668e480b933bb854929374e51a57c3b2
Merge: f83a05b def4daf
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Nov 21 21:01:30 2019 +0800
Merge branch 'master' into dev_new_merge_strategy
# Conflicts:
# server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
# server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
# server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
# server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
# server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
# server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
# server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
# server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
# server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
# tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
# tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
# tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
# tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
.mvn/wrapper/MavenWrapperDownloader.java | 50 +-
.travis.yml | 16 +
LICENSE | 10 +-
LICENSE-binary | 4 +
NOTICE | 10 +-
NOTICE-binary | 10 +-
README.md | 67 +-
RELEASE_NOTES.md | 278 +++--
client-py/readme.md | 28 +-
client-py/src/client_example.py | 196 +++-
.../src/assembly/resources/sbin/start-client.bat | 3 +
client/src/assembly/resources/sbin/start-client.sh | 2 +
.../org/apache/iotdb/client/AbstractClient.java | 14 +-
docs/Documentation-CHN/OtherMaterial-Examples.md | 83 --
.../0-Get Started/1-QuickStart.md} | 50 +-
.../0-Get Started/2-Frequently asked questions.md} | 0
.../0-Get Started/3-Publication.md} | 2 +-
.../UserGuide/1-Overview/2-Architecture.md | 2 +-
.../UserGuide/1-Overview/4-Features.md | 4 +-
.../1-Data Model and Terminology.md} | 67 +-
.../2-Data Type.md | 2 +-
.../3-Encoding.md | 4 +-
.../4-Compression.md | 7 +-
.../UserGuide/3-Operation Manual/3-Data Import.md | 85 --
.../UserGuide/3-Operation Manual/4-Data Query.md | 503 ---------
.../3-Operation Manual/5-Data Maintenance.md | 88 --
.../1-Deployment.md => 3-Server/1-Download.md} | 9 +-
.../2-Single Node Setup.md} | 3 +-
.../3-Cluster Setup.md} | 3 +-
.../4-Config Manual.md} | 138 ++-
.../5-Docker Image.md} | 2 +-
.../1-Command Line Interface (CLI).md} | 40 +-
.../2-Programming - JDBC.md} | 33 +-
.../3-Programming - Session.md} | 4 +-
.../4-Programming - Other Languages.md} | 4 +-
.../5-Programming - TsFile API.md} | 10 +-
.../3-System Monitor.md | 152 ---
.../4-Performance Monitor.md | 87 --
.../4-Deployment and Management/5-System log.md | 64 --
.../6-Data Management.md | 74 --
.../5-IoTDB SQL Documentation/2-Reference.md | 137 ---
.../1-DDL (Data Definition Language).md} | 66 +-
.../2-DML (Data Manipulation Language).md | 193 ++++
.../3-Account Management Statements.md} | 8 +-
.../4-SQL Reference.md} | 244 ++++-
.../1-Sync.md => 6-System Tools/1-Sync Tool.md} | 61 +-
.../2-Memory Estimation Tool.md | 8 +-
.../3-JMX Tool.md | 2 +-
.../4-Watermark Tool.md} | 3 +-
.../5-Log Visualizer.md} | 4 +-
.../6-Query History Visualization Tool.md} | 12 +-
.../6-System Tools/7-Monitor and Log Tools.md | 257 +++++
.../1-Grafana.md} | 2 +-
.../2-MapReduce TsFile.md} | 25 +-
.../3-Spark TsFile.md} | 4 +-
.../4-Spark IoTDB.md} | 3 +-
.../7-Ecosystem Integration/5-Hive TsFile.md | 192 ++++
.../1-Hierarchy.md} | 2 +-
.../8-System Design (Developer)/2-Files.md | 63 ++
.../3-Writing Data on HDFS.md | 171 ++++
.../4-Shared Nothing Cluster.md} | 6 +-
.../UserGuide/8-TsFile/3-Hierarchy.md | 24 -
.../UserGuide/9-Tools-spark-iotdb.md | 24 -
.../UserGuide/9-Tools-spark-tsfile.md | 24 -
docs/Documentation/OtherMaterial-Examples.md | 84 --
docs/Documentation/OtherMaterial-Sample Data.md | 69 --
docs/Documentation/UserGuide/0-Content.md | 78 +-
.../0-Get Started/1-QuickStart.md} | 46 +-
.../0-Get Started/2-Frequently asked questions.md} | 6 +-
.../0-Get Started/3-Publication.md} | 0
.../UserGuide/1-Overview/2-Architecture.md | 2 +-
.../UserGuide/1-Overview/4-Features.md | 33 +-
.../1-Data Model and Terminology.md} | 71 +-
.../2-Data Type.md | 2 +-
.../3-Encoding.md | 4 +-
.../4-Compression.md | 8 +-
.../UserGuide/3-Operation Manual/3-Data Import.md | 87 --
.../3-Operation Manual/5-Data Maintenance.md | 86 --
.../Documentation/UserGuide/3-Server/1-Download.md | 75 ++
.../UserGuide/3-Server/2-Single Node Setup.md} | 12 +-
.../UserGuide/3-Server/3-Cluster Setup.md} | 6 +-
.../4-Config Manual.md} | 133 ++-
.../5-Docker Image.md} | 14 +-
.../1-Command Line Interface (CLI).md} | 48 +-
.../2-Programming - JDBC.md} | 72 +-
.../3-Programming - Session.md} | 49 +-
.../4-Client/4-Programming - Other Languages.md | 11 +-
.../5-Programming - TsFile API.md} | 391 ++++---
.../4-Deployment and Management/1-Deployment.md | 160 ---
.../4-Performance Monitor.md | 90 --
.../4-Deployment and Management/5-System log.md | 66 --
.../5-IoTDB SQL Documentation/2-Reference.md | 137 ---
.../1-DDL (Data Definition Language).md} | 73 +-
.../2-DML (Data Manipulation Language).md} | 132 ++-
.../3-Account Management Statements.md} | 10 +-
.../4-SQL Reference.md} | 258 ++++-
.../UserGuide/6-JDBC API/2-Status Code.md | 64 --
.../1-Sync.md => 6-System Tools/1-Sync Tool.md} | 74 +-
.../2-Memory Estimation Tool.md | 17 +-
.../3-JMX Tool.md | 2 +-
.../4-Watermark Tool.md} | 14 +-
.../5-Log Visualizer.md} | 2 +-
.../6-Query History Visualization Tool.md} | 12 +-
.../7-Monitor and Log Tools.md} | 124 ++-
.../1-Grafana.md} | 3 +-
.../2-MapReduce TsFile.md} | 26 +-
.../3-Spark TsFile.md} | 45 +-
.../4-Spark IoTDB.md} | 20 +-
.../7-Ecosystem Integration/5-Hive TsFile.md | 190 ++++
.../8-System Design (Developer)/1-Hierarchy.md | 418 ++++++++
.../2-Files.md} | 13 +-
.../3-Writing Data on HDFS.md | 171 ++++
.../4-Shared Nothing Cluster.md} | 6 +-
.../UserGuide/8-TsFile/1-Installation.md | 96 --
.../UserGuide/8-TsFile/3-Hierarchy.md | 383 -------
.../hadoop => hadoop/tsfile}/TSFMRReadExample.java | 10 +-
.../hadoop => hadoop/tsfile}/TSMRWriteExample.java | 9 +-
.../hadoop => hadoop/tsfile}/TsFileHelper.java | 4 +-
.../iotdb/tsfile/TsFileWriteWithRowBatch.java | 6 +-
.../iotdb/tsfile/TsFileWriteWithTSRecord.java | 20 +-
grafana/readme.md | 61 +-
grafana/readme_zh.md | 109 +-
hadoop/README.md | 192 +++-
.../iotdb/hadoop/fileSystem/HDFSConfUtil.java | 88 ++
.../apache/iotdb/hadoop/fileSystem/HDFSFile.java | 6 +-
.../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 5 +-
.../org/apache/iotdb/hadoop/tsfile/IReaderSet.java | 22 +-
.../apache/iotdb/hadoop/tsfile/TSFInputFormat.java | 24 +-
.../apache/iotdb/hadoop/tsfile/TSFInputSplit.java | 11 +-
.../iotdb/hadoop/tsfile/TSFRecordReader.java | 98 +-
.../iotdb/hadoop/tsfile/record/HDFSTSRecord.java | 2 +-
.../hadoop => hadoop/tsfile}/TSFHadoopTest.java | 6 +-
.../tsfile}/TSFInputSplitTest.java | 12 +-
.../hadoop => hadoop/tsfile}/TsFileTestHelper.java | 4 +-
hive-connector/pom.xml | 150 +++
.../org/apache/iotdb/hive/TSFHiveInputFormat.java | 51 +
.../org/apache/iotdb/hive/TSFHiveOutputFormat.java | 67 ++
.../org/apache/iotdb/hive/TSFHiveRecordReader.java | 152 +++
.../org/apache/iotdb/hive/TSFHiveRecordWriter.java | 69 ++
.../org/apache/iotdb/hive/TsFileDeserializer.java | 149 +++
.../java/org/apache/iotdb/hive/TsFileSerDe.java | 158 +++
.../apache/iotdb/hive/TsFileSerDeException.java | 18 +-
.../apache/iotdb/hive/TSFHiveInputFormatTest.java | 112 ++
.../apache/iotdb/hive/TSFHiveRecordReaderTest.java | 132 +++
.../apache/iotdb/hive/TsFileDeserializerTest.java | 118 +++
.../org/apache/iotdb/hive/TsFileSerDeTest.java | 141 +++
.../org/apache/iotdb/hive}/TsFileTestHelper.java | 6 +-
jdbc/README.md | 70 +-
.../main/java/org/apache/iotdb/jdbc/Config.java | 24 +-
.../main/java/org/apache/iotdb/jdbc/Constant.java | 29 +-
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 46 +-
.../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 303 ++----
.../apache/iotdb/jdbc/IoTDBMetadataResultSet.java | 68 +-
.../jdbc/IoTDBPreparedInsertionStatement.java | 2 +-
.../apache/iotdb/jdbc/IoTDBPreparedStatement.java | 30 +-
.../org/apache/iotdb/jdbc/IoTDBQueryResultSet.java | 24 +-
.../org/apache/iotdb/jdbc/IoTDBSQLException.java | 5 +
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 161 +--
.../src/main/java/org/apache/iotdb/jdbc/Utils.java | 139 +--
.../test/java/org/apache/iotdb/jdbc/BatchTest.java | 29 +-
.../iotdb/jdbc/IoTDBDatabaseMetadataTest.java | 56 +-
.../apache/iotdb/jdbc/IoTDBQueryResultSetTest.java | 238 +++--
.../org/apache/iotdb/jdbc/IoTDBStatementTest.java | 40 +-
.../test/java/org/apache/iotdb/jdbc/UtilsTest.java | 241 +++--
licenses/The MIT License | 11 -
licenses/The MIT License (QOS.ch) | 21 +
licenses/The MIT License (progressbar) | 21 +
pom.xml | 24 +-
server/pom.xml | 5 +
server/server-changelist.md | 39 +
.../resources/conf/iotdb-engine.properties | 111 +-
server/src/assembly/resources/conf/iotdb-env.bat | 22 +-
server/src/assembly/resources/conf/iotdb-env.sh | 22 +-
.../resources/conf/iotdb-sync-client.properties | 16 +-
server/src/assembly/resources/conf/logback.xml | 20 +
.../resources/conf/tsfile-format.properties | 50 -
.../src/assembly/resources/sbin/start-server.bat | 37 +-
server/src/assembly/resources/sbin/start-server.sh | 16 -
.../assembly/resources/tools/start-sync-client.bat | 2 +-
.../assembly/resources/tools/start-sync-client.sh | 2 +-
.../upgrade/config.properties} | 24 +-
.../offline-upgrade.bat} | 23 +-
.../resources/tools/upgrade/offline-upgrade.sh | 23 +-
.../org/apache/iotdb/db/sql/parse/TqlLexer.g | 37 +
.../org/apache/iotdb/db/sql/parse/TqlParser.g | 129 ++-
.../iotdb/db/auth/user/LocalFileUserAccessor.java | 20 +-
.../db/concurrent/IoTDBThreadPoolFactory.java | 11 +
.../org/apache/iotdb/db/concurrent/ThreadName.java | 1 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 316 ++++--
.../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 4 +-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 22 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 277 +++--
.../db/conf/adapter/ActiveTimeSeriesCounter.java | 146 +++
.../db/conf/adapter/IActiveTimeSeriesCounter.java | 61 ++
.../db/conf/adapter/IoTDBConfigDynamicAdapter.java | 18 +-
.../db/conf/directories/DirectoryManager.java | 77 +-
.../directories/strategy/DirectoryStrategy.java | 33 +-
.../directories/strategy/SequenceStrategy.java | 6 +-
.../iotdb/db/cost/statistic/Measurement.java | 5 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 183 +++-
.../db/engine/cache/CacheHitRatioMonitor.java | 5 +-
.../iotdb/db/engine/cache/DeviceMetaDataCache.java | 38 +-
.../iotdb/db/engine/cache/TsFileMetaDataCache.java | 3 -
.../db/engine/fileSystem/SystemFileFactory.java | 2 +-
.../apache/iotdb/db/engine/flush/FlushManager.java | 7 +-
.../iotdb/db/engine/flush/FlushManagerMBean.java | 2 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 16 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 69 +-
.../iotdb/db/engine/memtable/ChunkBufferPool.java | 115 ---
.../apache/iotdb/db/engine/memtable/IMemTable.java | 10 +-
.../db/engine/memtable/IWritableMemChunk.java | 2 +-
.../db/engine/memtable/MemSeriesLazyMerger.java | 2 +-
.../engine/memtable/TimeValuePairInMemTable.java | 2 +-
.../db/engine/memtable/TimeValuePairSorter.java | 2 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 16 +-
.../iotdb/db/engine/merge/IRecoverMergeTask.java | 4 +-
.../engine/merge/inplace/recover/LogAnalyzer.java | 4 +-
.../merge/inplace/task/InplaceMergeTask.java | 8 +-
.../engine/merge/inplace/task/MergeFileTask.java | 36 +-
.../inplace/task/RecoverInplaceMergeTask.java | 4 +-
.../db/engine/merge/manage/MergeResource.java | 58 +-
.../db/engine/querycontext/QueryDataSource.java | 32 +
.../engine/storagegroup/StorageGroupProcessor.java | 596 +++++++++--
.../db/engine/storagegroup/TsFileProcessor.java | 51 +-
.../db/engine/storagegroup/TsFileResource.java | 83 +-
.../upgrade/UpgradeCheckStatus.java} | 21 +-
.../apache/iotdb/db/engine/upgrade/UpgradeLog.java | 88 ++
.../iotdb/db/engine/upgrade/UpgradeTask.java | 81 ++
.../db/exception/ConfigAdjusterException.java | 18 +-
.../exception/DiskSpaceInsufficientException.java | 8 +-
...eption.java => LoadConfigurationException.java} | 13 +-
.../apache/iotdb/db/exception/MergeException.java | 16 +-
...ysCheckException.java => ProcessException.java} | 17 +-
...on.java => QueryInBatchStatementException.java} | 18 +-
.../iotdb/db/exception/StartupException.java | 19 +-
.../iotdb/db/exception/StorageEngineException.java | 19 +-
.../db/exception/SyncConnectionException.java | 11 +-
....java => SyncDeviceOwnerConflictException.java} | 28 +-
...rorException.java => SystemCheckException.java} | 16 +-
.../db/exception/TsFileProcessorException.java | 22 +-
.../MetadataException.java} | 20 +-
.../TimeseriesAlreadyExistException.java} | 22 +-
.../MTreePathException.java} | 20 +-
.../NotStorageGroupException.java} | 23 +-
.../db/exception/path/PTreePathException.java | 13 +-
.../PathException.java} | 17 +-
.../db/exception/qp/QueryProcessorException.java | 40 -
.../exception/query/IllegalASTFormatException.java | 68 ++
.../{qp => query}/LogicalOperatorException.java | 22 +-
.../{qp => query}/LogicalOptimizeException.java | 20 +-
.../OutOfTTLException.java} | 21 +-
.../QueryProcessException.java} | 20 +-
.../{ => query}/UnSupportedFillTypeException.java | 19 +-
.../runtime/FlushRunTimeException.java} | 12 +-
.../StorageEngineFailureException.java | 17 +-
.../{ => storageGroup}/StorageGroupException.java | 22 +-
.../storageGroup/StorageGroupNotSetException.java | 17 +-
.../storageGroup/StorageGroupPathException.java} | 13 +-
.../StorageGroupProcessorException.java | 29 +-
.../java/org/apache/iotdb/db/metadata/MGraph.java | 152 +--
.../org/apache/iotdb/db/metadata/MManager.java | 531 +++++-----
.../java/org/apache/iotdb/db/metadata/MNode.java | 30 +
.../java/org/apache/iotdb/db/metadata/MTree.java | 482 ++++-----
.../MetaUtils.java} | 50 +-
.../iotdb/db/metadata/MetadataOperationType.java | 3 +-
.../java/org/apache/iotdb/db/metadata/PNode.java | 10 +-
.../java/org/apache/iotdb/db/metadata/PTree.java | 111 +-
.../org/apache/iotdb/db/monitor/IStatistic.java | 2 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 23 +-
.../iotdb/db/monitor/collector/FileSize.java | 2 +-
.../org/apache/iotdb/db/qp/QueryProcessor.java | 61 +-
.../apache/iotdb/db/qp/constant/DatetimeUtils.java | 68 +-
.../apache/iotdb/db/qp/constant/SQLConstant.java | 24 +-
.../qp/executor/AbstractQueryProcessExecutor.java | 176 +++-
.../db/qp/executor/IQueryProcessExecutor.java | 40 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 358 +++----
.../org/apache/iotdb/db/qp/logical/Operator.java | 3 +-
.../db/qp/logical/crud/BasicFunctionOperator.java | 11 +-
.../db/qp/logical/crud/BasicOperatorType.java | 2 +-
...DeleteOperator.java => DeleteDataOperator.java} | 4 +-
.../iotdb/db/qp/logical/crud/FilterOperator.java | 47 +-
...Operator.java => CreateTimeSeriesOperator.java} | 96 +-
.../DeleteStorageGroupOperator.java} | 29 +-
.../logical/sys/DeleteTimeSeriesOperator.java} | 31 +-
.../logical/sys/LoadConfigurationOperator.java} | 23 +-
.../SetStorageGroupOperator.java} | 30 +-
.../logical/sys/SetTTLOperator.java} | 35 +-
.../logical/sys/ShowOperator.java} | 22 +-
.../logical/sys/ShowTTLOperator.java} | 21 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 2 +-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 2 +
.../iotdb/db/qp/physical/crud/QueryPlan.java | 6 +-
.../db/qp/physical/sys/CreateTimeSeriesPlan.java | 107 ++
.../sys/DeleteStorageGroupPlan.java} | 33 +-
.../sys/DeleteTimeSeriesPlan.java} | 33 +-
.../physical/sys/LoadConfigurationPlan.java} | 23 +-
.../iotdb/db/qp/physical/sys/MetadataPlan.java | 210 ----
.../sys/SetStorageGroupPlan.java} | 42 +-
.../physical/sys/SetTTLPlan.java} | 52 +-
.../physical/sys/ShowPlan.java} | 40 +-
.../iotdb/db/qp/physical/sys/ShowTTLPlan.java | 27 +
.../iotdb/db/qp/strategy/LogicalGenerator.java | 709 ++++++++-----
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 275 ++---
.../qp/strategy/optimizer/ConcatPathOptimizer.java | 38 +-
.../qp/strategy/optimizer/DnfFilterOptimizer.java | 6 +-
.../db/qp/strategy/optimizer/IFilterOptimizer.java | 6 +-
.../qp/strategy/optimizer/ILogicalOptimizer.java | 2 +-
.../optimizer/MergeSingleFilterOptimizer.java | 2 +-
.../qp/strategy/optimizer/RemoveNotOptimizer.java | 15 +-
.../db/query/aggregation/AggregateFunction.java | 42 +-
.../db/query/aggregation/impl/FirstAggrFunc.java | 6 +-
.../iotdb/db/query/context/QueryContext.java | 14 +
.../iotdb/db/query/control/FileReaderManager.java | 48 +-
.../iotdb/db/query/control/JobFileManager.java | 33 +-
.../db/query/control/QueryResourceManager.java | 2 +-
.../db/query/dataset/DeviceIterateDataSet.java | 6 +-
.../dataset/EngineDataSetWithoutValueFilter.java | 25 +-
.../dataset/{AuthDataSet.java => ListDataSet.java} | 4 +-
.../dataset/groupby/GroupByEngineDataSet.java | 7 +-
.../groupby/GroupByWithValueFilterDataSet.java | 6 +-
.../groupby/GroupByWithoutValueFilterDataSet.java | 17 +-
.../db/query/executor/AggregateEngineExecutor.java | 19 +-
.../iotdb/db/query/executor/EngineExecutor.java | 9 +-
.../iotdb/db/query/executor/EngineQueryRouter.java | 22 +-
.../db/query/executor/FillEngineExecutor.java | 8 +-
.../db/query/executor/IEngineQueryRouter.java | 16 +-
.../db/query/externalsort/ExternalSortJob.java | 15 +-
.../query/externalsort/ExternalSortJobEngine.java | 15 +-
.../db/query/externalsort/ExternalSortJobPart.java | 15 +-
.../externalsort/ExternalSortJobScheduler.java | 15 +-
.../iotdb/db/query/externalsort/LineMerger.java | 15 +-
.../MultiSourceExternalSortJobPart.java | 15 +-
.../externalsort/SimpleExternalSortEngine.java | 19 +-
.../SingleSourceExternalSortJobPart.java | 15 +-
.../adapter/ByTimestampReaderAdapter.java | 15 +-
.../serialize/IExternalSortFileDeserializer.java | 15 +-
.../serialize/IExternalSortFileSerializer.java | 15 +-
.../FixLengthIExternalSortFileDeserializer.java | 15 +-
.../impl/FixLengthTimeValuePairSerializer.java | 15 +-
.../iotdb/db/query/factory/AggreFuncFactory.java | 8 +-
.../java/org/apache/iotdb/db/query/fill/IFill.java | 3 +-
.../org/apache/iotdb/db/query/fill/LinearFill.java | 9 +-
.../query/reader/chunkRelated/ChunkReaderWrap.java | 2 +-
.../fileRelated/UnSealedTsFileIterateReader.java | 2 +-
.../UnSealedTsFileReaderByTimestamp.java | 2 +-
.../resourceRelated/UnseqResourceMergeReader.java | 2 +-
.../UnseqResourceReaderByTimestamp.java | 2 +-
.../SeriesReaderWithoutValueFilter.java | 9 +-
.../query/timegenerator/EngineNodeConstructor.java | 2 +-
.../org/apache/iotdb/db/rescon/MemTablePool.java | 2 +-
.../apache/iotdb/db/rescon/TVListAllocator.java | 5 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 15 +-
.../org/apache/iotdb/db/service/JDBCService.java | 4 +-
.../iotdb/db/service/JDBCServiceEventHandler.java | 13 +-
.../org/apache/iotdb/db/service/JMXService.java | 8 +-
.../apache/iotdb/db/service/MetricsService.java | 2 +-
.../java/org/apache/iotdb/db/service/Monitor.java | 5 +-
.../org/apache/iotdb/db/service/ServiceType.java | 1 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 615 ++++++-----
.../org/apache/iotdb/db/service/UpgradeSevice.java | 118 +++
.../sync/conf/{Constans.java => SyncConstant.java} | 56 +-
.../iotdb/db/sync/conf/SyncSenderConfig.java | 143 ++-
.../iotdb/db/sync/conf/SyncSenderDescriptor.java | 53 +-
.../org/apache/iotdb/db/sync/package-info.java | 39 +
.../iotdb/db/sync/receiver/SyncServerManager.java | 17 +-
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 737 --------------
.../iotdb/db/sync/receiver/load/FileLoader.java | 234 +++++
.../db/sync/receiver/load/FileLoaderManager.java | 213 ++++
.../iotdb/db/sync/receiver/load/IFileLoader.java | 60 ++
.../iotdb/db/sync/receiver/load/ILoadLogger.java | 62 ++
.../iotdb/db/sync/receiver/load/LoadLogger.java | 72 ++
.../iotdb/db/sync/receiver/load/LoadType.java | 11 +-
.../recover/ISyncReceiverLogAnalyzer.java} | 17 +-
.../receiver/recover/ISyncReceiverLogger.java} | 65 +-
.../receiver/recover/SyncReceiverLogAnalyzer.java | 153 +++
.../sync/receiver/recover/SyncReceiverLogger.java | 72 ++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 305 ++++++
.../iotdb/db/sync/sender/SyncFileManager.java | 208 ----
.../apache/iotdb/db/sync/sender/SyncSender.java | 75 --
.../iotdb/db/sync/sender/SyncSenderImpl.java | 545 ----------
.../db/sync/sender/manage/ISyncFileManager.java | 67 ++
.../db/sync/sender/manage/SyncFileManager.java | 202 ++++
.../sender/recover/ISyncSenderLogAnalyzer.java | 52 +
.../db/sync/sender/recover/ISyncSenderLogger.java | 65 ++
.../sync/sender/recover/SyncSenderLogAnalyzer.java | 128 +++
.../db/sync/sender/recover/SyncSenderLogger.java | 72 ++
.../iotdb/db/sync/sender/transfer/ISyncClient.java | 104 ++
.../iotdb/db/sync/sender/transfer/SyncClient.java | 671 ++++++++++++
.../apache/iotdb/db/tools/TsFileSketchTool.java | 82 +-
.../java/org/apache/iotdb/db/tools/WalChecker.java | 10 +-
.../iotdb/db/tools/upgrade/OfflineUpgradeTool.java | 68 ++
.../db/tools/watermark/WatermarkDetector.java | 10 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 122 ++-
.../java/org/apache/iotdb/db/utils/SyncUtils.java | 40 +-
.../IRecoverMergeTask.java => utils/TestOnly.java} | 14 +-
.../apache/iotdb/db/utils/TypeInferenceUtils.java | 10 +-
.../org/apache/iotdb/db/utils/UpgradeUtils.java | 130 +++
.../writelog/manager/MultiFileLogNodeManager.java | 5 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 37 +-
.../writelog/recover/TsFileRecoverPerformer.java | 29 +-
.../conf/adapter/ActiveTimeSeriesCounterTest.java | 122 +++
.../db/conf/adapter/CompressionRatioTest.java | 2 -
.../iotdb/db/conf/adapter/HyperLogLogTest.java | 65 ++
.../adapter/IoTDBConfigDynamicAdapterTest.java | 32 +-
.../strategy/DirectoryStrategyTest.java | 12 +-
.../db/engine/cache/DeviceMetaDataCacheTest.java | 8 +-
.../db/engine/memtable/ChunkBufferPoolTest.java | 84 --
.../db/engine/memtable/MemTableFlushTaskTest.java | 4 +-
.../db/engine/memtable/MemTableTestUtils.java | 2 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 12 +-
.../iotdb/db/engine/merge/MergeOverLapTest.java | 29 +-
.../apache/iotdb/db/engine/merge/MergeTest.java | 30 +-
.../iotdb/db/engine/merge/MergeUpgradeTest.java | 160 +++
.../db/engine/merge/inplace/MergeLogTest.java | 7 +-
.../db/engine/merge/inplace/MergeTaskTest.java | 6 +-
.../engine/modification/DeletionFileNodeTest.java | 28 +-
.../db/engine/modification/DeletionQueryTest.java | 32 +-
.../storagegroup/FileNodeManagerBenchmark.java | 30 +-
.../storagegroup/StorageGroupProcessorTest.java | 34 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 307 ++++++
.../engine/storagegroup/TsFileProcessorTest.java | 14 +-
.../iotdb/db/integration/IoTDBAggregationIT.java | 31 +-
.../iotdb/db/integration/IoTDBAuthorizationIT.java | 184 ++--
.../apache/iotdb/db/integration/IoTDBCloseIT.java | 202 ++++
.../db/integration/IoTDBDeleteStorageGroupIT.java | 156 +++
.../iotdb/db/integration/IoTDBDeletionIT.java | 4 +-
.../{IOTDBFillIT.java => IoTDBFillIT.java} | 2 +-
.../iotdb/db/integration/IoTDBMetadataFetchIT.java | 41 +-
.../iotdb/db/integration/IoTDBMultiSeriesIT.java | 62 +-
.../db/integration/IoTDBMultiStatementsIT.java | 193 ++++
.../iotdb/db/integration/IoTDBQueryDemoIT.java | 198 ++++
.../iotdb/db/integration/IoTDBQuotedPathIT.java | 114 +++
.../iotdb/db/integration/IoTDBTimeZoneIT.java | 10 +-
.../apache/iotdb/db/integration/IoTDBTtlIT.java | 205 ++++
.../iotdb/db/integration/IoTDBVersionIT.java | 13 +-
.../org/apache/iotdb/db/metadata/MGraphTest.java | 6 +-
.../iotdb/db/metadata/MManagerAdvancedTest.java | 32 +-
.../iotdb/db/metadata/MManagerBasicTest.java | 117 ++-
.../iotdb/db/metadata/MManagerImproveTest.java | 71 +-
.../org/apache/iotdb/db/metadata/MTreeTest.java | 217 ++--
.../org/apache/iotdb/db/metadata/MetadataTest.java | 8 +-
.../iotdb/db/monitor/collector/FileSizeTest.java | 2 +-
.../iotdb/db/qp/bench/QueryParseBenchmark.java | 12 +-
.../iotdb/db/qp/other/TSPlanContextAuthorTest.java | 9 +-
.../db/qp/other/TSPlanContextPropertyTest.java | 9 +-
.../iotdb/db/qp/plan/LogicalPlanSmallTest.java | 58 +-
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 115 ++-
.../org/apache/iotdb/db/qp/plan/QPUpdateTest.java | 37 +-
.../iotdb/db/qp/plan/TestConcatOptimizer.java | 17 +-
.../iotdb/db/qp/strategy/LogicalGeneratorTest.java | 2 +-
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 43 +-
.../db/query/control/FileReaderManagerTest.java | 6 +-
.../query/externalsort/ExternalSortEngineTest.java | 2 +-
.../db/query/externalsort/FakeChunkReaderWrap.java | 2 +-
...ExternalSortFileSerializerDeserializerTest.java | 2 +-
.../iotdb/db/query/reader/ReaderTestHelper.java | 10 +-
.../fileRelated/UnSealedTsFileReaderTest.java | 14 +-
.../resourceRelated/SeqResourceReaderTest.java | 4 +-
.../resourceRelated/UnseqResourceReaderTest.java | 7 +-
.../query/reader/universal/FakedSeriesReader.java | 2 +-
.../db/sql/DatetimeQueryDataSetUtilsTest.java | 2 +-
.../org/apache/iotdb/db/sql/TqlParserTest.java | 1065 +++++++++++---------
.../db/sync/receiver/load/FileLoaderTest.java | 330 ++++++
.../recover/SyncReceiverLogAnalyzerTest.java | 212 ++++
.../receiver/recover/SyncReceiverLoggerTest.java | 112 ++
.../db/sync/sender/MultipleClientSyncTest.java | 226 -----
.../iotdb/db/sync/sender/SingleClientSyncTest.java | 559 ----------
.../iotdb/db/sync/sender/SyncFileManagerTest.java | 374 -------
.../db/sync/sender/manage/SyncFileManagerTest.java | 295 ++++++
.../sender/recover/SyncSenderLogAnalyzerTest.java | 163 +++
.../sync/sender/recover/SyncSenderLoggerTest.java | 111 ++
.../db/sync/sender/transfer/SyncClientTest.java | 140 +++
.../apache/iotdb/db/sync/test/SyncTestClient1.java | 253 -----
.../apache/iotdb/db/sync/test/SyncTestClient2.java | 262 -----
.../apache/iotdb/db/sync/test/SyncTestClient3.java | 282 ------
.../java/org/apache/iotdb/db/sync/test/Utils.java | 44 -
.../apache/iotdb/db/tools/IoTDBWatermarkTest.java | 15 +-
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 12 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 5 +
.../iotdb/db/{sync/test => utils}/RandomNum.java | 2 +-
.../apache/iotdb/db/writelog/PerformanceTest.java | 10 +-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 10 +-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 10 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 13 +-
service-rpc/rpc-changelist.md | 64 +-
.../org/apache/iotdb/rpc/IoTDBRPCException.java | 2 +-
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 2 +-
.../org/apache/iotdb/rpc/SynchronizedHandler.java | 2 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 61 +-
service-rpc/src/main/thrift/rpc.thrift | 45 +-
service-rpc/src/main/thrift/sync.thrift | 23 +-
session/pom.xml | 4 +
.../main/java/org/apache/iotdb/session/Config.java | 8 +-
.../iotdb/session/IoTDBSessionException.java | 2 +-
.../java/org/apache/iotdb/session/Session.java | 73 +-
.../org/apache/iotdb/session/SessionDataSet.java | 22 +-
.../org/apache/iotdb/session/SessionUtils.java | 112 +-
.../org/apache/iotdb/session/IoTDBSessionIT.java | 86 +-
spark-iotdb-connector/Readme.md | 19 +-
spark-iotdb-connector/pom.xml | 6 +
.../iotdb/{sparkdb => spark/db}/SQLConstant.java | 2 +-
.../iotdb/{sparkdb => spark/db}/Converter.scala | 2 +-
.../{sparkdb => spark/db}/DefaultSource.scala | 2 +-
.../iotdb/{sparkdb => spark/db}/IoTDBOptions.scala | 2 +-
.../iotdb/{sparkdb => spark/db}/IoTDBRDD.scala | 8 +-
.../{sparkdb => spark/db}/IoTDBRelation.scala | 2 +-
.../iotdb/{sparkdb => spark/db}/Transformer.scala | 66 +-
.../iotdb/{sparkdb => spark/db}/package.scala | 6 +-
.../{sparkdb => spark/db}/EnvironmentUtils.java | 2 +-
.../iotdb/{sparkdb => spark/db}/IoTDBTest.scala | 2 +-
spark-tsfile/README.md | 109 +-
.../org/apache/iotdb/spark/tsfile/Converter.scala | 2 +-
.../apache/iotdb/spark/tsfile/DefaultSource.scala | 16 +-
.../apache/iotdb/spark/tsfile/Transformer.scala | 56 +-
.../org/apache/iotdb/spark/tsfile/package.scala | 6 +-
.../org/apache/iotdb/spark/tsfile/TSFileSuit.scala | 4 +-
tsfile/format-changelist.md | 22 +-
.../apache/iotdb/tsfile/common/cache/LRUCache.java | 45 +-
.../iotdb/tsfile/common/conf/TSFileConfig.java | 320 ++++--
.../iotdb/tsfile/common/conf/TSFileDescriptor.java | 4 +-
.../tsfile/common/constant/TsFileConstant.java | 2 +
.../iotdb/tsfile/compress/IUnCompressor.java | 8 +-
.../iotdb/tsfile/encoding/decoder/Decoder.java | 11 +-
.../tsfile/encoding/decoder/PlainDecoder.java | 20 +
.../iotdb/tsfile/file/footer/ChunkGroupFooter.java | 2 +-
.../iotdb/tsfile/file/header/ChunkHeader.java | 67 +-
.../iotdb/tsfile/file/metadata/TsDigest.java | 567 ++++++-----
.../iotdb/tsfile/file/metadata/TsFileMetaData.java | 727 +++++++------
.../file/metadata/statistics/BinaryStatistics.java | 8 +-
.../metadata/statistics/BooleanStatistics.java | 4 +-
.../file/metadata/statistics/DoubleStatistics.java | 4 +-
.../file/metadata/statistics/FloatStatistics.java | 4 +-
.../metadata/statistics/IntegerStatistics.java | 4 +-
.../file/metadata/statistics/LongStatistics.java | 4 +-
.../file/metadata/statistics/NoStatistics.java | 4 +-
.../file/metadata/statistics/Statistics.java | 14 +-
.../org/apache/iotdb/tsfile/fileSystem/FSType.java | 2 +-
.../fileOutputFactory/HDFSOutputFactory.java | 1 +
.../iotdb/tsfile/read/TsFileSequenceReader.java | 136 ++-
.../org/apache/iotdb/tsfile/read/common/Chunk.java | 10 +-
.../org/apache/iotdb/tsfile/read/common/Field.java | 13 +-
.../org/apache/iotdb/tsfile/read/common/Path.java | 147 ++-
.../tsfile/read/controller/ChunkLoaderImpl.java | 2 +-
.../read/controller/MetadataQuerierByFileImpl.java | 6 +-
.../read/expression/util/ExpressionOptimizer.java | 6 +-
.../query/dataset/DataSetWithoutTimeGenerator.java | 10 +-
.../tsfile/read/query/dataset/QueryDataSet.java | 6 +-
.../tsfile/read/query/executor/TsFileExecutor.java | 16 +
.../tsfile/read/reader/chunk/ChunkReader.java | 11 +-
.../tool/upgrade/TsfileUpgradeToolV0_8_0.java | 571 +++++++++++
.../iotdb/tsfile/tool/upgrade/UpgradeTool.java | 108 ++
.../org/apache/iotdb/tsfile/utils/BloomFilter.java | 143 +++
.../apache/iotdb/tsfile/utils/Murmur128Hash.java | 167 +++
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 56 +-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 1 +
.../iotdb/tsfile/write/chunk/ChunkBuffer.java | 253 -----
.../tsfile/write/chunk/ChunkGroupWriterImpl.java | 3 +-
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 413 ++++----
.../iotdb/tsfile/write/chunk/IChunkWriter.java | 2 +-
.../apache/iotdb/tsfile/write/page/PageWriter.java | 185 +++-
.../write/writer/RestorableTsFileIOWriter.java | 1 -
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 52 +-
.../resources/tsfile-format.properties.template | 55 -
.../iotdb/tsfile/file/header/PageHeaderTest.java | 2 +-
.../tsfile/file/metadata/TsFileMetaDataTest.java | 10 +-
.../iotdb/tsfile/read/ReadOnlyTsFileTest.java | 21 +-
.../apache/iotdb/tsfile/read/common/PathTest.java | 28 +-
.../iotdb/tsfile/read/reader/PageReaderTest.java | 19 +-
.../apache/iotdb/tsfile/utils/BloomFilterTest.java | 66 ++
.../iotdb/tsfile/write/TsFileReadWriteTest.java | 52 +-
.../iotdb/tsfile/write/series/PageWriterTest.java | 85 --
.../iotdb/tsfile/write/writer/PageWriterTest.java | 219 ++++
572 files changed, 23686 insertions(+), 15156 deletions(-)
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e12d281,8cdd1df..67292d5
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@@ -20,12 -20,11 +20,11 @@@ package org.apache.iotdb.db.conf
import java.io.File;
import java.time.ZoneId;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+ import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
+ import org.apache.iotdb.db.exception.LoadConfigurationException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 4f300b5,1de6e2c..ebb2fef
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@@ -18,10 -18,17 +18,18 @@@
*/
package org.apache.iotdb.db.conf;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.time.ZoneId;
+ import java.util.Properties;
+ import org.apache.iotdb.db.conf.directories.DirectoryManager;
+ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -294,20 -267,65 +268,68 @@@ public class IoTDBDescriptor
conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum);
- conf.setTsFileStorageFs(properties.getProperty("tsfile_storage_fs"));
- conf.setHdfsIp(properties.getProperty("hdfs_ip"));
- conf.setHdfsPort(properties.getProperty("hdfs_port"));
+ conf.setTsFileStorageFs(properties.getProperty("tsfile_storage_fs",
+ conf.getTsFileStorageFs().toString()));
+ conf.setCoreSitePath(
+ properties.getProperty("core_site_path", conf.getCoreSitePath()));
+ conf.setHdfsSitePath(
+ properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
+ conf.setHdfsIp(properties.getProperty("hdfs_ip").split(","));
+ conf.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
+ conf.setDfsNameServices(
+ properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
+ conf.setDfsHaNamenodes(properties.getProperty("dfs_ha_namenodes").split(","));
+ conf.setDfsHaAutomaticFailoverEnabled(
+ Boolean.parseBoolean(properties.getProperty("dfs_ha_automatic_failover_enabled",
+ String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
+ conf.setDfsClientFailoverProxyProvider(
+ properties.getProperty("dfs_client_failover_proxy_provider",
+ conf.getDfsClientFailoverProxyProvider()));
+ conf.setUseKerberos(Boolean.parseBoolean(
+ properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
+ conf.setKerberosKeytabFilePath(
+ properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
+ conf.setKerberosPrincipal(
+ properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
+
+ conf.setDefaultTTL(Long.parseLong(properties.getProperty("default_ttl",
+ String.valueOf(conf.getDefaultTTL()))));
// At the same time, set TSFileConfig
- TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(properties.getProperty("tsfile_storage_fs"));
- TSFileDescriptor.getInstance().getConfig().setHdfsIp(properties.getProperty("hdfs_ip"));
+ TSFileDescriptor.getInstance().getConfig()
+ .setTSFileStorageFs(properties.getProperty("tsfile_storage_fs"));
+ TSFileDescriptor.getInstance().getConfig().setKerberosKeytabFilePath(
+ properties.getProperty("core_site_path", conf.getCoreSitePath()));
+ TSFileDescriptor.getInstance().getConfig().setKerberosPrincipal(
+ properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
+ TSFileDescriptor.getInstance().getConfig()
+ .setHdfsIp(properties.getProperty("hdfs_ip").split(","));
TSFileDescriptor.getInstance().getConfig().setHdfsPort(properties.getProperty("hdfs_port"));
+ TSFileDescriptor.getInstance().getConfig()
+ .setDfsNameServices(properties.getProperty("dfs_nameservices"));
+ TSFileDescriptor.getInstance().getConfig()
+ .setDfsHaNamenodes(properties.getProperty("dfs_ha_namenodes").split(","));
+ TSFileDescriptor.getInstance().getConfig().setDfsHaAutomaticFailoverEnabled(
+ Boolean.parseBoolean(properties.getProperty("dfs_ha_automatic_failover_enabled")));
+ TSFileDescriptor.getInstance().getConfig().setDfsClientFailoverProxyProvider(
+ properties.getProperty("dfs_client_failover_proxy_provider"));
+ TSFileDescriptor.getInstance().getConfig().setUseKerberos(Boolean.parseBoolean(
+ properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
+ TSFileDescriptor.getInstance().getConfig().setKerberosKeytabFilePath(
+ properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
+ TSFileDescriptor.getInstance().getConfig().setKerberosPrincipal(
+ properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
+
+ // set tsfile-format config
+ loadTsFileProps(properties);
+ conf.setMergeFileStrategy(MergeFileStrategy.valueOf(properties.getProperty(
+ "merge_file_strategy", conf.getMergeFileStrategy().name())));
+
+ } catch (FileNotFoundException e) {
+ logger.warn("Fail to find config file {}", url, e);
} catch (IOException e) {
- logger.warn("Cannot load config file because, use default configuration", e);
+ logger.warn("Cannot load config file, use default configuration", e);
} catch (Exception e) {
logger.warn("Incorrect format in config file, use default configuration", e);
} finally {
diff --cc server/src/main/java/org/apache/iotdb/db/engine/merge/IRecoverMergeTask.java
index 1ae7e21,874a91b..6727191
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/IRecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/IRecoverMergeTask.java
@@@ -17,11 -17,11 +17,11 @@@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.selector;
+package org.apache.iotdb.db.engine.merge;
-public enum MergeFileStrategy {
- MAX_SERIES_NUM,
- MAX_FILE_NUM,
- // TODO: HOW?
- TRADE_OFF,
+import java.io.IOException;
- import org.apache.iotdb.db.exception.MetadataErrorException;
++import org.apache.iotdb.db.exception.metadata.MetadataException;
+
+public interface IRecoverMergeTask {
- void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException;
++ void recoverMerge(boolean continueMerge) throws IOException, MetadataException;
}
diff --cc server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
index 9f3a44b,b156e3c..ae1411e
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
@@@ -30,9 -30,9 +30,9 @@@ import java.util.Map.Entry
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
- import org.apache.iotdb.db.exception.MetadataErrorException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.tsfile.read.common.Path;
import org.slf4j.Logger;
diff --cc server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
index 8f747e7,35eabd0..d4a6be9
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
@@@ -25,16 -25,13 +25,16 @@@ import java.util.ArrayList
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.MergeCallback;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
- import org.apache.iotdb.db.exception.MetadataErrorException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@@ -160,12 -156,15 +160,16 @@@ public class InplaceMergeTask implement
}
for (TsFileResource seqFile : resource.getSeqFiles()) {
- File mergeFile = new File(seqFile.getFile().getPath() + MERGE_SUFFIX);
+ File mergeFile = FSFactoryProducer.getFSFactory()
+ .getFile(seqFile.getFile().getPath() + MERGE_SUFFIX);
mergeFile.delete();
+ seqFile.setMerging(false);
+ }
+ for (TsFileResource unseqFile : resource.getUnseqFiles()) {
+ unseqFile.setMerging(false);
}
- File logFile = new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
+ File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, InplaceMergeLogger.MERGE_LOG_NAME);
if (executeCallback) {
// make sure merge.log is not deleted until unseqFiles are cleared so that when system
// reboots, the undeleted files can be deleted again
diff --cc server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeFileTask.java
index 3cb7472,7b179fa..3f551c5
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeFileTask.java
@@@ -17,8 -17,11 +17,11 @@@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.task;
+package org.apache.iotdb.db.engine.merge.inplace.task;
+ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
diff --cc server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
index e6a79b3,00ce1e7..821dc4a
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
@@@ -28,15 -28,12 +28,15 @@@ import java.util.HashMap
import java.util.List;
import java.util.Map.Entry;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer;
-import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer.Status;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
-import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.IRecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.MaxSeriesMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.MergeCallback;
+import org.apache.iotdb.db.engine.merge.inplace.recover.LogAnalyzer;
+import org.apache.iotdb.db.engine.merge.inplace.recover.LogAnalyzer.Status;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
- import org.apache.iotdb.db.exception.MetadataErrorException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.Path;
@@@ -61,8 -58,8 +61,8 @@@ public class RecoverInplaceMergeTask ex
super(seqFiles, unseqFiles, storageGroupSysDir, callback, taskName, fullMerge, storageGroupName);
}
- public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
+ public void recoverMerge(boolean continueMerge) throws IOException, MetadataException {
- File logFile = new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
+ File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, InplaceMergeLogger.MERGE_LOG_NAME);
if (!logFile.exists()) {
logger.info("{} no merge.log, merge recovery ends", taskName);
return;
diff --cc server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index fca1f7f,b982cb4..7995b8a
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@@ -19,6 -19,18 +19,18 @@@
package org.apache.iotdb.db.engine.merge.manage;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
++import static org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask.MERGE_SUFFIX;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.stream.Collectors;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
@@@ -36,17 -46,6 +46,7 @@@ import org.apache.iotdb.tsfile.write.ch
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.io.IOException;
- import java.util.*;
- import java.util.Map.Entry;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.stream.Collectors;
-
- import static org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask.MERGE_SUFFIX;
/**
* MergeResource manages files and caches of readers, writers, MeasurementSchemas and
diff --cc server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 37b230c,fc64ddc..47b9631
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@@ -39,19 -38,18 +40,21 @@@ import java.util.concurrent.Callable
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
+ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.merge.IRecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.RecoverInplaceMergeTask;
+ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
-import org.apache.iotdb.db.engine.merge.task.MergeTask;
-import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger;
+import org.apache.iotdb.db.engine.merge.squeeze.task.RecoverSqueezeMergeTask;
+import org.apache.iotdb.db.engine.merge.squeeze.task.SqueezeMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@@ -210,8 -211,8 +216,8 @@@ public class StorageGroupProcessor
recover();
}
- private void recover() throws ProcessorException {
+ private void recover() throws StorageGroupProcessorException {
- logger.info("recover Storage Group {}", storageGroupName);
+ logger.info("recover Storage Group {}", storageGroupName);
try {
// collect TsFiles from sequential and unsequential data directory
@@@ -227,22 -230,12 +235,21 @@@
if (mergingMods.exists()) {
mergingModification = new ModificationFile(mergingMods.getPath());
}
- RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
- storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
- logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
- recoverMergeTask
- .recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+
+ Class cls = judgeMergeStrategy();
+ if (cls != null) {
+ IRecoverMergeTask recoverMergeTask;
+ if (cls == InplaceMergeTask.class) {
+ recoverMergeTask = new RecoverInplaceMergeTask(seqTsFiles, unseqTsFiles,
+ storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
+ IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
+ } else {
+ recoverMergeTask = new RecoverSqueezeMergeTask(seqTsFiles, unseqTsFiles,
+ storageGroupSysDir.getPath(), this::mergeEndAction, taskName, storageGroupName);
+ }
+ logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
+ recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+ }
-
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
mergingMods.delete();
}
@@@ -891,15 -1007,14 +1034,17 @@@
}
long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
- MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList);
+ long timeLowerBound = System.currentTimeMillis() - dataTTL;
+ MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList,
+ timeLowerBound);
- IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
+ // TODO: choose a better strategy accordingly
+ MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
+
+ IMergeFileSelector fileSelector = strategy.getFileSelector(mergeResource, budget);
try {
- List[] mergeFiles = fileSelector.select();
- if (mergeFiles.length == 0) {
+ fileSelector.select();
+ if (fileSelector.getSelectedSeqFiles().size() + fileSelector.getSelectedUnseqFiles().size() <= 1) {
logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
budget);
return;
@@@ -911,10 -1026,18 +1056,17 @@@
// cached during selection
mergeResource.setCacheDeviceMeta(true);
+ Callable<Void> mergeTask = strategy.getMergeTask(mergeResource, storageGroupSysDir.getPath(),
+ this::mergeEndAction, taskName, fileSelector.getConcurrentMergeNum(),
+ storageGroupName , fullMerge);
- mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
++ mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
+ for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
+ tsFileResource.setMerging(true);
+ }
+ for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
+ tsFileResource.setMerging(true);
+ }
+
- MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
- this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
- storageGroupName);
- mergingModification = new ModificationFile(
- storageGroupSysDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
MergeManager.getINSTANCE().submitMainTask(mergeTask);
if (logger.isInfoEnabled()) {
logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
@@@ -932,147 -1054,291 +1084,350 @@@
}
}
- private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) {
- MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
- switch (strategy) {
- case MAX_FILE_NUM:
- return new MaxFileMergeFileSelector(resource, budget);
- case MAX_SERIES_NUM:
- return new MaxSeriesMergeFileSelector(resource, budget);
- default:
- throw new UnsupportedOperationException("Unknown MergeFileStrategy " + strategy);
- }
- }
-
- private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
- mergeLock.writeLock().lock();
- try {
- unSequenceFileList.removeAll(unseqFiles);
- } finally {
- mergeLock.writeLock().unlock();
- }
-
- for (TsFileResource unseqFile : unseqFiles) {
- unseqFile.getWriteQueryLock().writeLock().lock();
- try {
- unseqFile.remove();
- } finally {
- unseqFile.getWriteQueryLock().writeLock().unlock();
- }
- }
- }
-
+ private void updateMergeModification(TsFileResource seqFile) {
+ seqFile.getWriteQueryLock().writeLock().lock();
+ try {
+ // remove old modifications and write modifications generated during merge
+ seqFile.removeModFile();
+ if (mergingModification != null) {
+ for (Modification modification : mergingModification.getModifications()) {
+ seqFile.getModFile().write(modification);
+ }
+ }
+ } catch (IOException e) {
+ logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
+ seqFile.getFile(), e);
+ } finally {
+ seqFile.getWriteQueryLock().writeLock().unlock();
+ }
+ }
+
+ private void removeMergingModification() {
+ try {
+ if (mergingModification != null) {
+ mergingModification.remove();
+ mergingModification = null;
+ }
+ } catch (IOException e) {
+ logger.error("{} cannot remove merging modification ", storageGroupName, e);
+ }
+ }
+
- protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
- File mergeLog) {
- logger.info("{} a merge task is ending...", storageGroupName);
-
- if (unseqFiles.isEmpty()) {
- // merge runtime exception arose, just end this merge
- isMerging = false;
- logger.info("{} a merge task abnormally ends", storageGroupName);
- return;
- }
+ private void handleInplaceMerge(List<TsFileResource> seqFiles,
+ List<TsFileResource> unseqFiles, File mergeLog) {
removeUnseqFiles(unseqFiles);
--
for (int i = 0; i < seqFiles.size(); i++) {
TsFileResource seqFile = seqFiles.get(i);
- seqFile.getMergeQueryLock().writeLock().lock();
mergeLock.writeLock().lock();
try {
- logger.debug("{} is updating the {} merged file's modification file", storageGroupName, i);
- try {
- // remove old modifications and write modifications generated during merge
- seqFile.removeModFile();
- if (mergingModification != null) {
- for (Modification modification : mergingModification.getModifications()) {
- seqFile.getModFile().write(modification);
- }
- }
- } catch (IOException e) {
- logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
- seqFile.getFile(), e);
- }
+ updateMergeModification(seqFile);
if (i == seqFiles.size() - 1) {
- try {
- if (mergingModification != null) {
- mergingModification.remove();
- mergingModification = null;
- }
- } catch (IOException e) {
- logger.error("{} cannot remove merging modification ", storageGroupName, e);
- }
+ removeMergingModification();
isMerging = false;
+ mergeLog.delete();
}
} finally {
- mergeLog.delete();
- seqFile.getMergeQueryLock().writeLock().unlock();
mergeLock.writeLock().unlock();
}
}
logger.info("{} a merge task ends", storageGroupName);
}
+ private void handleSqueezeMerge(
+ List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, File mergeLog,
+ TsFileResource newFile) {
+
+ // make sure no queries are holding the seqFiles
+ for (TsFileResource seqFile : seqFiles) {
- seqFile.getMergeQueryLock().writeLock().lock();
++ seqFile.getWriteQueryLock().writeLock().lock();
+ }
+ // block new queries and insertions to prevent the seqFiles from changing
+ writeLock();
+ mergeLock.writeLock().lock();
+ try {
+ removeUnseqFiles(unseqFiles);
+ // insert the new file into a proper place
+ List<TsFileResource> newSeqFiles = new ArrayList<>();
+ int startIdx = sequenceFileList.indexOf(seqFiles.get(0));
+ int endIdx = sequenceFileList.indexOf(seqFiles.get(seqFiles.size() - 1));
+ for (int i = 0; i < startIdx; i++) {
+ newSeqFiles.add(sequenceFileList.get(i));
+ }
+ newSeqFiles.add(newFile);
+ for (int i = endIdx + 1; i < sequenceFileList.size(); i++) {
+ newSeqFiles.add(sequenceFileList.get(i));
+ }
+ sequenceFileList = newSeqFiles;
+ // move modifications generated during merge into the new file
+ if (mergingModification != null) {
+ logger.debug("{} is updating the merged file's modification file", storageGroupName);
+ for (Modification modification : mergingModification.getModifications()) {
+ newFile.getModFile().write(modification);
+ }
+ mergingModification.remove();
+ mergingModification = null;
+ }
+ // remove old seqFiles
+ for (TsFileResource seqFile : seqFiles) {
+ seqFile.remove();
- seqFile.getMergeQueryLock().writeLock().unlock();
++ seqFile.getWriteQueryLock().writeLock().unlock();
+ }
+ mergeLog.delete();
+ } catch (IOException e) {
+ logger.error("{} fails to do the after merge action,", storageGroupName, e);
+ } finally {
+ isMerging = false;
+ writeUnlock();
+ mergeLock.writeLock().unlock();
+ logger.info("{} a merge task ends", storageGroupName);
+ }
+ }
+
+ private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
+ mergeLock.writeLock().lock();
+ try {
+ unSequenceFileList.removeAll(unseqFiles);
+ } finally {
+ mergeLock.writeLock().unlock();
+ }
+ for (TsFileResource unseqFile : unseqFiles) {
- unseqFile.getMergeQueryLock().writeLock().lock();
++ unseqFile.getWriteQueryLock().writeLock().lock();
+ try {
+ unseqFile.remove();
+ } finally {
- unseqFile.getMergeQueryLock().writeLock().unlock();
++ unseqFile.getWriteQueryLock().writeLock().unlock();
+ }
+ }
+ }
+
+ void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
+ File mergeLog, TsFileResource newFile) {
+ logger.info("{} a merge task is ending...", storageGroupName);
+
+ if (unseqFiles.isEmpty()) {
+ // merge runtime exception arose, just end this merge
+ mergeLock.writeLock().lock();
+ try {
+ if (mergingModification != null) {
+ logger.debug("{} is updating the merged file's modification file", storageGroupName);
+ mergingModification.remove();
+ mergingModification = null;
+ }
+ } catch (IOException e) {
+ logger.error("{} cannot remove merge modifications after merge abnormally ends",
+ storageGroupName, e);
+ } finally {
+ isMerging = false;
+ logger.info("{} a merge task abnormally ends", storageGroupName);
+ mergeLock.writeLock().unlock();
+ }
+ return;
+ }
+
+ if (newFile == null) {
+ handleInplaceMerge(seqFiles, unseqFiles, mergeLog);
+ } else {
+ handleSqueezeMerge(seqFiles, unseqFiles, mergeLog, newFile);
+ }
+ }
+
+ /**
+ * Load a new tsfile to storage group processor
+ *
+ * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+ * or unsequence list.
+ *
+ * Secondly, execute the loading process by the type.
+ *
+ * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+ *
+ * @param newTsFileResource tsfile resource
+ * @UsedBy sync module.
+ */
+ public void loadNewTsFile(TsFileResource newTsFileResource)
+ throws TsFileProcessorException {
+ File tsfileToBeInserted = newTsFileResource.getFile();
+ writeLock();
+ mergeLock.writeLock().lock();
+ try {
+ loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+ getBinarySearchIndex(newTsFileResource));
+ updateLatestTimeMap(newTsFileResource);
+ } catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
+ logger.error("Failed to append the tsfile {} to storage group processor {}.",
+ tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+ throw new TsFileProcessorException(e);
+ } finally {
+ mergeLock.writeLock().unlock();
+ writeUnlock();
+ }
+ }
+
+ /**
+ * Get binary search index in @code{sequenceFileList}
+ *
+ * @return right index to insert
+ */
+ private int getBinarySearchIndex(TsFileResource tsFileResource) {
+ if (sequenceFileList.isEmpty()) {
+ return 0;
+ }
+ long targetTsFileTime = Long.parseLong(
+ tsFileResource.getFile().getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ int s = 0;
+ int e = sequenceFileList.size() - 1;
+ while (s <= e) {
+ int m = s + ((e - s) >> 1);
+ long currentTsFileTime = Long.parseLong(sequenceFileList.get(m).getFile().getName()
+ .split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+ if (currentTsFileTime >= targetTsFileTime) {
+ e = m - 1;
+ } else {
+ s = m + 1;
+ }
+ }
+ return s;
+ }
+
+ /**
+ * Update latest time in latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+ *
+ * @UsedBy sync module
+ */
+ private void updateLatestTimeMap(TsFileResource newTsFileResource) {
+ for (Entry<String, Long> entry : newTsFileResource.getEndTimeMap().entrySet()) {
+ String device = entry.getKey();
+ long endTime = newTsFileResource.getEndTimeMap().get(device);
+ if (!latestTimeForEachDevice.containsKey(device)
+ || latestTimeForEachDevice.get(device) < endTime) {
+ latestTimeForEachDevice.put(device, endTime);
+ }
+ if (!latestFlushedTimeForEachDevice.containsKey(device)
+ || latestFlushedTimeForEachDevice.get(device) < endTime) {
+ latestFlushedTimeForEachDevice.put(device, endTime);
+ }
+ }
+ }
+
+ /**
+ * Execute the loading process by the type.
+ *
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
+ * @param index the index in sequenceFileList/unSequenceFileList
+ * @UsedBy sync module
+ */
+ private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
+ TsFileResource tsFileResource, int index)
+ throws TsFileProcessorException, DiskSpaceInsufficientException {
+ File targetFile;
+ switch (type) {
+ case LOAD_UNSEQUENCE:
+ targetFile =
+ new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+ syncedTsFile.getParentFile().getName() + File.separatorChar + syncedTsFile
+ .getName());
+ tsFileResource.setFile(targetFile);
+ unSequenceFileList.add(index, tsFileResource);
+ logger
+ .info("Load tsfile in unsequence list, move file from {} to {}",
+ syncedTsFile.getAbsolutePath(),
+ targetFile.getAbsolutePath());
+ break;
+ case LOAD_SEQUENCE:
+ targetFile =
+ new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+ syncedTsFile.getParentFile().getName() + File.separatorChar + syncedTsFile
+ .getName());
+ tsFileResource.setFile(targetFile);
+ sequenceFileList.add(index, tsFileResource);
+ logger
+ .info("Load tsfile in sequence list, move file from {} to {}",
+ syncedTsFile.getAbsolutePath(),
+ targetFile.getAbsolutePath());
+ break;
+ default:
+ throw new TsFileProcessorException(
+ String.format("Unsupported type of loading tsfile : %s", type));
+ }
+
+ // move file from sync dir to data dir
+ if (!targetFile.getParentFile().exists()) {
+ targetFile.getParentFile().mkdirs();
+ }
+ if (syncedTsFile.exists() && !targetFile.exists()) {
+ try {
+ FileUtils.moveFile(syncedTsFile, targetFile);
+ } catch (IOException e) {
+ throw new TsFileProcessorException(String.format(
+ "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
+ syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
+ }
+ }
+ try {
+ FileUtils.moveFile(new File(syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
+ new File(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
+ } catch (IOException e) {
+ throw new TsFileProcessorException(String.format(
+ "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
+ new File(syncedTsFile + TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(),
+ new File(targetFile + TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Delete tsfile if it exists.
+ *
+ * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+ *
+ * Secondly, delete the tsfile and .resource file.
+ *
+ * @param deletedTsfile tsfile to be deleted
+ * @UsedBy sync module.
+ */
+ public void deleteTsfile(File deletedTsfile) {
+ writeLock();
+ mergeLock.writeLock().lock();
+ TsFileResource deletedTsFileResource = null;
+ try {
+ Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
+ while (sequenceIterator.hasNext()) {
+ TsFileResource sequenceResource = sequenceIterator.next();
+ if (sequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+ deletedTsFileResource = sequenceResource;
+ sequenceIterator.remove();
+ break;
+ }
+ }
+ if (deletedTsFileResource == null) {
+ Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
+ while (unsequenceIterator.hasNext()) {
+ TsFileResource unsequenceResource = unsequenceIterator.next();
+ if (unsequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+ deletedTsFileResource = unsequenceResource;
+ unsequenceIterator.remove();
+ break;
+ }
+ }
+ }
+ } finally {
+ mergeLock.writeLock().unlock();
+ writeUnlock();
+ }
+ if (deletedTsFileResource == null) {
+ return;
+ }
+ deletedTsFileResource.getWriteQueryLock().writeLock().lock();
+ try {
+ logger.info("Delete tsfile {} in sync loading process.", deletedTsFileResource.getFile());
+ deletedTsFileResource.remove();
+ } finally {
+ deletedTsFileResource.getWriteQueryLock().writeLock().unlock();
+ }
+ }
+
public TsFileProcessor getWorkSequenceTsFileProcessor() {
return workSequenceTsFileProcessor;
diff --cc server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 11bec4a,bf7b9d7..0e83e6a
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@@ -24,16 -25,14 +24,16 @@@ import static org.junit.Assert.assertEq
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
- import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+ import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.squeeze.task.SqueezeMergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
- import org.apache.iotdb.db.exception.MetadataErrorException;
- import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.exception.path.PathException;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@@ -67,9 -66,12 +67,12 @@@ public class MergeOverLapTest extends M
}
@Override
- void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+ protected void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
for (int i = 0; i < seqFileNum; i++) {
- File file = SystemFileFactory.INSTANCE.getFile("seq" + i + "-" + i + ".tsfile");
+ File file = new File(
+ i + "seq" + IoTDBConstant.TSFILE_NAME_SEPARATOR + i + IoTDBConstant.TSFILE_NAME_SEPARATOR
+ + i + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0
+ + ".tsfile");
TsFileResource tsFileResource = new TsFileResource(file);
seqResources.add(tsFileResource);
prepareFile(tsFileResource, i * ptNum, ptNum, 0);
@@@ -150,8 -142,8 +159,7 @@@
QueryContext context = new QueryContext();
Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId());
SeqResourceIterateReader tsFilesReader = new SeqResourceIterateReader(path,
- Collections.singletonList(mergedFile),
- Collections.singletonList(seqResources.get(0)),
-- null, context);
++ Collections.singletonList(mergedFile), null, context);
int cnt = 0;
try {
while (tsFilesReader.hasNext()) {
@@@ -161,7 -153,7 +169,8 @@@
assertEquals(batchData.getTimeByIndex(i) + 20000.0, batchData.getDoubleByIndex(i), 0.001);
}
}
+ assertEquals(expected, cnt);
+ assertEquals(1000, cnt);
} finally {
tsFilesReader.close();
}
diff --cc server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 8222cbc,d7327c0..1166b6d
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@@ -31,9 -32,9 +32,9 @@@ import org.apache.iotdb.db.engine.cache
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
- import org.apache.iotdb.db.exception.MetadataErrorException;
- import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.exception.path.PathException;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@@ -41,9 -42,6 +42,7 @@@ import org.apache.iotdb.tsfile.exceptio
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
- import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
- import org.apache.iotdb.tsfile.read.common.util.ChunkProvider;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@@ -94,10 -91,9 +93,10 @@@ public abstract class MergeTest
MManager.getInstance().clear();
EnvironmentUtils.cleanAllDir();
MergeManager.getINSTANCE().stop();
+ ChunkProviderExecutor.getINSTANCE().close();
}
- private void prepareSeries() throws MetadataErrorException, PathErrorException {
+ private void prepareSeries() throws MetadataException, PathException {
measurementSchemas = new MeasurementSchema[measurementNum];
for (int i = 0; i < measurementNum; i++) {
measurementSchemas[i] = new MeasurementSchema("sensor" + i, TSDataType.DOUBLE,
@@@ -118,10 -114,13 +117,14 @@@
}
}
- protected void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException,
- WriteProcessException {
++
+ void prepareFiles(int seqFileNum, int unseqFileNum)
+ throws IOException, WriteProcessException {
for (int i = 0; i < seqFileNum; i++) {
- File file = SystemFileFactory.INSTANCE.getFile("seq" + i + "-" + i + ".tsfile");
+ File file = new File(
+ i + "seq" + IoTDBConstant.TSFILE_NAME_SEPARATOR + i + IoTDBConstant.TSFILE_NAME_SEPARATOR
+ + i + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0
+ + ".tsfile");
TsFileResource tsFileResource = new TsFileResource(file);
seqResources.add(tsFileResource);
prepareFile(tsFileResource, i * ptNum, ptNum, 0);
diff --cc server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeLogTest.java
index e3c80db,8d1e6d7..9299417
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeLogTest.java
@@@ -28,12 -28,11 +28,12 @@@ import java.io.FileReader
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
- import org.apache.iotdb.db.exception.MetadataErrorException;
- import org.apache.iotdb.db.exception.PathErrorException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.junit.After;
@@@ -42,10 -41,10 +42,10 @@@ import org.junit.Test
public class MergeLogTest extends MergeTest {
- File tempSGDir;
+ private File tempSGDir;
@Before
- public void setUp() throws IOException, WriteProcessException, MetadataErrorException, PathErrorException {
+ public void setUp() throws IOException, WriteProcessException, MetadataException, PathException {
super.setUp();
tempSGDir = new File("tempSG");
tempSGDir.mkdirs();
diff --cc server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeTaskTest.java
index 9801160,af8e093..f6028d8
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeTaskTest.java
@@@ -26,12 -26,11 +26,12 @@@ import java.io.IOException
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
- import org.apache.iotdb.db.exception.MetadataErrorException;
- import org.apache.iotdb.db.exception.PathErrorException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
diff --cc server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 1d9502b,4b14708..f788271
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@@ -21,24 -21,18 +21,21 @@@ package org.apache.iotdb.db.engine.stor
import static org.junit.Assert.assertFalse;
import java.io.File;
+ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
- import java.util.ArrayList;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-
- import org.apache.iotdb.db.exception.ProcessorException;
- import org.apache.iotdb.db.exception.qp.QueryProcessorException;
+ import org.apache.iotdb.db.exception.query.QueryProcessException;
+ import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
-
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
- import org.apache.iotdb.db.query.control.JobFileManager;
- import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.TSRecord;
@@@ -182,9 -179,7 +182,9 @@@ public class StorageGroupProcessorTest
}
@Test
- public void testMerge() throws QueryProcessorException {
+ public void testMerge() throws QueryProcessException {
+ MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
+ IoTDBDescriptor.getInstance().getConfig().setMergeFileStrategy(MergeFileStrategy.INPLACE_MAX_SERIES_NUM);
mergeLock = new AtomicLong(0);
for (int j = 21; j <= 30; j++) {
diff --cc server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 4e01348,eee2800..ba5beac
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@@ -162,9 -163,9 +166,10 @@@ public class EnvironmentUtils
StorageEngine.getInstance().reset();
MultiFileLogNodeManager.getInstance().start();
FlushManager.getInstance().start();
+ MergeManager.getINSTANCE().start();
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+ ChunkProviderExecutor.getINSTANCE().start();
}
private static void createAllDir() {
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
index 9a3d2b0,693c51b..26c3d6d
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
@@@ -21,9 -21,10 +21,11 @@@ package org.apache.iotdb.tsfile.common.
public class TsFileConstant {
public static final String TSFILE_SUFFIX = ".tsfile";
+ public static final String TSFILE_SEPARATOR = "-";
public static final String TSFILE_HOME = "TSFILE_HOME";
public static final String TSFILE_CONF = "TSFILE_CONF";
+ public static final String PATH_ROOT = "root";
+ public static final String PATH_UPGRADE = "tmp";
public static final String PATH_SEPARATOR = ".";
public static final String PATH_SEPARATER_NO_REGEX = "\\.";
public static final String DEFAULT_DELTA_TYPE = "default_delta_type";
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 79d940d,e8a810b..f495c61
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@@ -31,13 -33,10 +33,14 @@@ public class Chunk
/**
* All data with timestamp <= deletedAt are considered deleted.
*/
- private long deletedAt;
+ long deletedAt;
+
+ Chunk() {
+
+ }
+ private EndianType endianType;
- public Chunk(ChunkHeader header, ByteBuffer buffer, long deletedAt) {
+ public Chunk(ChunkHeader header, ByteBuffer buffer, long deletedAt, EndianType endianType) {
this.chunkHeader = header;
this.chunkData = buffer;
this.deletedAt = deletedAt;
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index c63e8de,6a7e8e2..f4472ec
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@@ -73,25 -87,23 +87,24 @@@ public class ChunkWriterImpl implement
* statistic on a stage. It will be reset after calling {@code writeAllPagesOfSeriesToTsFile()}
*/
private Statistics<?> chunkStatistics;
+ // time of the latest written time value pair
- private long time;
- private long minTimestamp = Long.MIN_VALUE;
-
- private MeasurementSchema measurementSchema;
- private int ptNum;
/**
- * constructor of ChunkWriterImpl.
- *
- * @param chunkBuffer chunk in buffer
- * @param pageSizeThreshold page size threshold
+ * statistic on a page. It will be reset after calling {@code writePageHeaderAndDataIntoBuff()}
*/
- public ChunkWriterImpl(ChunkBuffer chunkBuffer, int pageSizeThreshold) {
- this.measurementSchema = chunkBuffer.getSchema();
- this.dataType = measurementSchema.getType();
- this.chunkBuffer = chunkBuffer;
- this.psThres = pageSizeThreshold;
+ //private Statistics<?> pageStatistics;
+ /**
+ * @param schema schema of this measurement
+ */
+ public ChunkWriterImpl(MeasurementSchema schema) {
+ this.measurementSchema = schema;
+ this.compressor = ICompressor.getCompressor(schema.getCompressor());
+ this.pageBuffer = new PublicBAOS();
+
+ this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxNumberOfPointsInPage = TSFileDescriptor.getInstance().getConfig()
+ .getMaxNumberOfPointsInPage();
// initial check of memory usage. So that we have enough data to make an initial prediction
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
@@@ -395,11 -282,119 +283,124 @@@
@Override
public TSDataType getDataType() {
- return dataType;
+ return measurementSchema.getType();
+ }
+
+ /**
+ * write the page header and data into the PageWriter's output stream.
+ *
+ * NOTE: for upgrading 0.8.0 to 0.9.0
+ */
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
+ numOfPages++;
+
+ // 1. update time statistics
+ if (this.chunkMinTime == Long.MIN_VALUE) {
+ this.chunkMinTime = header.getMinTimestamp();
+ }
+ if (this.chunkMinTime == Long.MIN_VALUE) {
+ throw new PageException("No valid data point in this page");
+ }
+ this.chunkMaxTime = header.getMaxTimestamp();
+
+ // write the page header to pageBuffer
+ try {
+ logger.debug("start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
+ header.serializeTo(pageBuffer);
+ logger.debug("finish to flush a page header {} of {} into buffer, buffer position {} ", header,
+ measurementSchema.getMeasurementId(), pageBuffer.size());
+
+ } catch (IOException e) {
+ if (chunkPointCount == 0) {
+ chunkMinTime = Long.MIN_VALUE;
+ }
+ throw new PageException(
+ "IO Exception in writeDataPageHeader,ignore this page", e);
+ }
+
+ // update data point num
+ this.chunkPointCount += header.getNumOfValues();
+
+ // write page content to temp PBAOS
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+ channel.write(data);
+ } catch (IOException e) {
+ throw new PageException(e);
+ }
+ }
+
+ /**
+ * write the page to specified IOWriter.
+ *
+ * @param writer the specified IOWriter
+ * @param statistics the statistic information provided by series writer
+ * @return the data size of this chunk
+ * @throws IOException exception in IO
+ */
+ public long writeAllPagesOfChunkToTsFile(TsFileIOWriter writer, Statistics<?> statistics)
+ throws IOException {
+ if (chunkPointCount == 0) {
+ return 0;
+ }
+
+ // start to write this column chunk
+ int headerSize = writer
+ .startFlushChunk(measurementSchema, compressor.getType(), measurementSchema.getType(),
+ measurementSchema.getEncodingType(), statistics, chunkMaxTime,
+ chunkMinTime, pageBuffer.size(),
+ numOfPages);
+
+ long dataOffset = writer.getPos();
+
+ // write all pages of this column
+ writer.writeBytesToStream(pageBuffer);
+
+ long dataSize = writer.getPos() - dataOffset;
+ if (dataSize != pageBuffer.size()) {
+ throw new IOException(
+ "Bytes written is inconsistent with the size of data: " + dataSize + " !="
+ + " " + pageBuffer.size());
+ }
+
+ writer.endChunk(chunkPointCount);
+ return headerSize + dataSize;
+ }
+
+ /**
+ * reset exist data in page for next stage.
+ */
+ public void reset() {
+ chunkMinTime = Long.MIN_VALUE;
+ pageBuffer.reset();
+ chunkPointCount = 0;
+ }
+
+ /**
+ * estimate max page memory size.
+ *
+ * @return the max possible allocated size currently
+ */
+ public long estimateMaxPageMemSize() {
+ // return the sum of size of buffer and page max size
+ return (long) (pageBuffer.size() + estimateMaxPageHeaderSize());
+ }
+
+ private int estimateMaxPageHeaderSize() {
+ return PageHeader.calculatePageHeaderSize(measurementSchema.getType());
+ }
+
+ /**
+ * get current data size.
+ *
+ * @return current data size that the writer has serialized.
+ */
+ public long getCurrentDataSize() {
+ return pageBuffer.size();
}
+
+ @Override
- public int getPtNum() {
- return ptNum;
++ public long getPtNum() {
++ return chunkPointCount + pageWriter.getPointNumber();
+ }
}
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
index fb0ff32,3f7f762..0785e49
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
@@@ -124,6 -124,4 +124,6 @@@ public interface IChunkWriter
int getNumOfPages();
TSDataType getDataType();
+
- int getPtNum();
++ long getPtNum();
}
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index fe5dfff,fe5dfff..7048fb9
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@@ -110,7 -110,7 +110,6 @@@ public class RestorableTsFileIOWriter e
}
}
-- @Override
public Map<String, MeasurementSchema> getKnownSchema() {
return knownSchemas;
}