You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 10:59:13 UTC
[iotdb] 01/01: merge master
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f342899dec2f1d056c027e7d7ef9dc62d07906e5
Merge: 327eb7e e09b377
Author: lta <li...@163.com>
AuthorDate: Thu Jan 14 18:58:33 2021 +0800
merge master
docker/src/main/Dockerfile-0.11.0 => .dockerignore | 24 +-
.github/pull_request_template.md | 37 +-
.github/workflows/e2e.yml | 52 ++
NOTICE | 2 +-
NOTICE-binary | 2 +-
README.md | 15 +
README_ZH.md | 14 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 17 +-
.../java/org/apache/iotdb/cli/AbstractCli.java | 138 ++-
cli/src/main/java/org/apache/iotdb/cli/Cli.java | 2 +
cli/src/main/java/org/apache/iotdb/cli/WinCli.java | 3 +
.../org/apache/iotdb/cli/utils/IoTPrinter.java | 107 +++
.../main/java/org/apache/iotdb/tool/ImportCsv.java | 63 +-
cluster/pom.xml | 7 +
cluster/src/assembly/resources/sbin/start-node.sh | 2 +-
.../java/org/apache/iotdb/cluster/ClusterMain.java | 20 +-
.../apache/iotdb/cluster/RemoteTsFileResource.java | 41 +-
.../cluster/client/async/AsyncClientPool.java | 79 +-
.../iotdb/cluster/client/sync/SyncClientPool.java | 24 +-
.../apache/iotdb/cluster/config/ClusterConfig.java | 14 +-
.../iotdb/cluster/config/ClusterConstant.java | 33 +-
.../iotdb/cluster/coordinator/Coordinator.java | 602 +++++++++++++
.../apache/iotdb/cluster/log/LogDispatcher.java | 16 +-
.../cluster/log/applier/AsyncDataLogApplier.java | 4 +-
.../iotdb/cluster/log/applier/MetaLogApplier.java | 8 +-
.../iotdb/cluster/log/catchup/CatchUpTask.java | 2 +-
.../cluster/log/manage/CommittedEntryManager.java | 5 +-
.../log/manage/MetaSingleSnapshotLogManager.java | 1 +
.../iotdb/cluster/log/manage/RaftLogManager.java | 26 +-
.../log/manage/UnCommittedEntryManager.java | 10 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 10 +-
.../cluster/query/manage/QueryCoordinator.java | 115 +--
.../apache/iotdb/cluster/server/ClientServer.java | 28 +-
.../iotdb/cluster/server/DataClusterServer.java | 2 +-
.../iotdb/cluster/server/MetaClusterServer.java | 17 +-
.../apache/iotdb/cluster/server/RaftServer.java | 7 +-
.../handlers/caller/AppendNodeEntryHandler.java | 6 +-
.../server/handlers/caller/HeartbeatHandler.java | 2 +-
.../cluster/server/heartbeat/HeartbeatThread.java | 8 +-
.../cluster/server/member/DataGroupMember.java | 17 +-
.../cluster/server/member/MetaGroupMember.java | 198 +++--
.../iotdb/cluster/server/member/RaftMember.java | 130 ++-
.../cluster/server/{ => monitor}/NodeReport.java | 3 +-
.../manage => server/monitor}/NodeStatus.java | 41 +-
.../monitor/NodeStatusManager.java} | 87 +-
.../iotdb/cluster/server/{ => monitor}/Peer.java | 2 +-
.../iotdb/cluster/server/{ => monitor}/Timer.java | 15 +-
.../cluster/server/service/MetaAsyncService.java | 6 +
.../cluster/server/service/MetaSyncService.java | 5 +
.../apache/iotdb/cluster/utils/ClusterUtils.java | 9 +-
.../cluster/utils/nodetool/ClusterMonitor.java | 2 +-
.../iotdb/cluster/common/EnvironmentUtils.java | 218 -----
.../org/apache/iotdb/cluster/common/IoTDBTest.java | 4 +-
.../cluster/integration/BaseSingleNodeTest.java | 2 +-
.../iotdb/cluster/integration/SingleNodeTest.java | 7 +-
.../iotdb/cluster/log/CommitLogCallbackTest.java | 2 +-
.../iotdb/cluster/log/CommitLogTaskTest.java | 2 +-
.../iotdb/cluster/log/LogDispatcherTest.java | 2 +-
.../log/applier/AsyncDataLogApplierTest.java | 2 +-
.../cluster/log/applier/DataLogApplierTest.java | 13 +-
.../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 9 +-
.../cluster/log/catchup/LogCatchUpTaskTest.java | 8 +-
.../log/catchup/SnapshotCatchUpTaskTest.java | 8 +-
.../manage/MetaSingleSnapshotLogManagerTest.java | 3 +
.../cluster/log/manage/RaftLogManagerTest.java | 22 +-
.../cluster/log/snapshot/DataSnapshotTest.java | 12 +-
.../log/snapshot/MetaSimpleSnapshotTest.java | 2 +
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 5 +
.../iotdb/cluster/partition/SlotManagerTest.java | 2 +-
.../cluster/partition/SlotPartitionTableTest.java | 2 +-
.../apache/iotdb/cluster/query/BaseQueryTest.java | 15 +-
.../query/ClusterAggregateExecutorTest.java | 48 +-
.../query/ClusterDataQueryExecutorTest.java | 22 +-
.../cluster/query/ClusterFillExecutorTest.java | 72 +-
.../cluster/query/ClusterPlanExecutorTest.java | 8 +-
.../cluster/query/ClusterQueryRouterTest.java | 216 ++---
.../ClusterGroupByNoVFilterDataSetTest.java | 64 +-
.../groupby/ClusterGroupByVFilterDataSetTest.java | 74 +-
.../query/groupby/MergeGroupByExecutorTest.java | 83 +-
.../query/groupby/RemoteGroupByExecutorTest.java | 146 ++--
.../cluster/query/manage/QueryCoordinatorTest.java | 15 +-
.../query/reader/ClusterTimeGeneratorTest.java | 34 +-
.../cluster/query/reader/DatasourceInfoTest.java | 16 +-
.../reader/RemoteSeriesReaderByTimestampTest.java | 122 +--
.../query/reader/RemoteSimpleSeriesReaderTest.java | 136 +--
.../caller/AppendGroupEntryHandlerTest.java | 2 +-
.../caller/AppendNodeEntryHandlerTest.java | 4 +-
.../handlers/caller/ElectionHandlerTest.java | 2 +-
.../handlers/caller/HeartbeatHandlerTest.java | 2 +-
.../handlers/caller/LogCatchUpHandlerTest.java | 2 +-
.../server/heartbeat/DataHeartbeatThreadTest.java | 5 +
.../server/heartbeat/HeartbeatThreadTest.java | 10 +-
.../server/heartbeat/MetaHeartbeatThreadTest.java | 5 +
.../cluster/server/member/DataGroupMemberTest.java | 129 +--
.../iotdb/cluster/server/member/MemberTest.java | 35 +-
.../cluster/server/member/MetaGroupMemberTest.java | 83 +-
docker/src/main/Dockerfile | 46 +-
docker/src/main/Dockerfile-0.10.0 | 4 +-
docker/src/main/Dockerfile-0.10.1 | 4 +-
docker/src/main/Dockerfile-0.11.0 | 4 +-
.../main/{Dockerfile-0.11.0 => Dockerfile-0.11.1} | 10 +-
.../main/{Dockerfile-0.11.0 => Dockerfile-0.11.2} | 10 +-
docs/UserGuide/Client/Programming - Native API.md | 18 +
docs/UserGuide/Client/Status Codes.md | 2 +
.../Ecosystem Integration/Zeppelin-IoTDB.md | 20 +-
docs/UserGuide/Operation Manual/Administration.md | 2 +
.../DML Data Manipulation Language.md | 155 +++-
.../Operation Manual/UDF User Defined Function.md | 192 ++++-
docs/UserGuide/Server/Config Manual.md | 18 +
docs/UserGuide/System Tools/CSV Tool.md | 1 +
docs/zh/UserGuide/Client/Command Line Interface.md | 2 +-
.../UserGuide/Client/Programming - Native API.md | 14 +
docs/zh/UserGuide/Client/Status Codes.md | 2 +
.../Ecosystem Integration/Zeppelin-IoTDB.md | 20 +-
.../UserGuide/Operation Manual/Administration.md | 2 +
.../DML Data Manipulation Language.md | 156 +++-
.../Operation Manual/UDF User Defined Function.md | 195 ++++-
docs/zh/UserGuide/Server/Config Manual.md | 17 +
docs/zh/UserGuide/System Tools/CSV Tool.md | 1 +
.../main/java/org/apache/iotdb/SessionExample.java | 20 +-
grafana/img/add_data_source.png | Bin 175851 -> 108927 bytes
grafana/img/add_graph.png | Bin 723579 -> 364163 bytes
grafana/img/edit_data_source.png | Bin 313673 -> 177869 bytes
.../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java | 4 +-
.../main/java/org/apache/iotdb/jdbc/Config.java | 18 +-
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 15 +-
.../apache/iotdb/jdbc/IoTDBConnectionParams.java | 20 +-
.../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 4 +-
.../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java | 6 +-
.../apache/iotdb/jdbc/IoTDBPreparedStatement.java | 8 +-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 29 +-
.../src/main/java/org/apache/iotdb/jdbc/Utils.java | 7 +
pom.xml | 2 +-
.../file-changelists/TsFileResource-changelist.md | 8 +-
.../resources/conf/iotdb-engine.properties | 28 +
server/src/assembly/resources/conf/logback.xml | 25 +-
server/src/assembly/resources/sbin/start-server.sh | 2 +-
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 5 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 43 +-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 14 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 16 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 37 +-
.../db/engine/cache/TimeSeriesMetadataCache.java | 32 +-
.../compaction/CompactionMergeTaskPoolManager.java | 32 +-
.../db/engine/compaction/TsFileManagement.java | 38 +-
.../level/LevelCompactionTsFileManagement.java | 258 +++---
.../no/NoCompactionTsFileManagement.java | 10 +-
.../engine/compaction/utils/CompactionLogger.java | 2 +-
.../engine/compaction/utils/CompactionUtils.java | 12 +-
.../apache/iotdb/db/engine/flush/FlushManager.java | 24 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 48 +-
.../merge/selector/MaxFileMergeFileSelector.java | 11 +-
.../iotdb/db/engine/merge/task/MergeFileTask.java | 138 ++-
.../db/engine/merge/task/MergeMultiChunkTask.java | 4 +-
.../io/LocalTextModificationAccessor.java | 61 +-
.../engine/storagegroup/StorageGroupProcessor.java | 220 ++---
.../db/engine/storagegroup/TsFileProcessor.java | 89 +-
.../db/engine/storagegroup/TsFileResource.java | 356 +++-----
.../storagegroup/timeindex/DeviceTimeIndex.java | 308 +++++++
.../storagegroup/timeindex/FileTimeIndex.java | 193 +++++
.../engine/storagegroup/timeindex/ITimeIndex.java | 138 +++
.../storagegroup/timeindex/TimeIndexLevel.java} | 43 +-
.../apache/iotdb/db/exception/IoTDBException.java | 21 +
.../db/exception/PartitionViolationException.java | 8 +-
...xception.java => QueryIdNotExsitException.java} | 10 +-
.../iotdb/db/exception/StorageEngineException.java | 2 +-
.../db/exception/UDFRegistrationException.java | 7 +-
.../iotdb/db/exception/WriteProcessException.java | 4 +
.../metadata/AliasAlreadyExistException.java | 1 +
.../exception/metadata/IllegalPathException.java | 1 +
.../db/exception/metadata/MetadataException.java | 8 +
.../metadata/PathAlreadyExistException.java | 1 +
.../exception/metadata/PathNotExistException.java | 20 +-
.../metadata/StorageGroupNotSetException.java | 5 +
.../db/exception/query/OutOfTTLException.java | 2 +-
.../db/exception/query/QueryProcessException.java | 6 +-
.../QueryTimeoutRuntimeException.java} | 68 +-
.../org/apache/iotdb/db/metadata/MManager.java | 61 +-
.../java/org/apache/iotdb/db/metadata/MTree.java | 77 +-
.../iotdb/db/metadata/logfile/MLogWriter.java | 143 +---
.../apache/iotdb/db/metrics/ui/MetricsPage.java | 2 +-
.../apache/iotdb/db/monitor/MonitorConstants.java | 5 -
.../org/apache/iotdb/db/monitor/StatMonitor.java | 6 +-
.../org/apache/iotdb/db/mqtt/PublishHandler.java | 123 +--
.../main/java/org/apache/iotdb/db/qp/Planner.java | 1 +
.../apache/iotdb/db/qp/constant/SQLConstant.java | 12 +
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 8 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 192 ++++-
.../org/apache/iotdb/db/qp/logical/Operator.java | 5 +-
...TracingOperator.java => KillQueryOperator.java} | 20 +-
.../db/qp/logical/sys/RemoveFileOperator.java | 5 -
.../db/qp/logical/sys/ShowDevicesOperator.java | 18 +
.../iotdb/db/qp/logical/sys/TracingOperator.java | 10 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 8 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 25 +
.../physical/crud/InsertRowsOfOneDevicePlan.java | 154 ++++
.../apache/iotdb/db/qp/physical/crud/UDFPlan.java | 3 +-
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 3 +-
.../db/qp/physical/sys/AlterTimeSeriesPlan.java | 2 +-
.../iotdb/db/qp/physical/sys/AuthorPlan.java | 2 +-
.../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 6 +-
.../db/qp/physical/sys/CreateTimeSeriesPlan.java | 14 +-
.../{ShowDevicesPlan.java => KillQueryPlan.java} | 25 +-
.../iotdb/db/qp/physical/sys/ShowDevicesPlan.java | 13 +-
.../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 51 +-
.../physical/sys/ShowQueryProcesslistPlan.java} | 56 +-
.../db/qp/physical/sys/ShowTimeSeriesPlan.java | 56 +-
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 34 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 31 +-
.../db/qp/{constant => utils}/DatetimeUtils.java | 12 +-
.../db/query/aggregation/AggregateResult.java | 3 +-
.../db/query/aggregation/impl/AvgAggrResult.java | 10 +-
.../db/query/aggregation/impl/SumAggrResult.java | 10 +-
.../iotdb/db/query/control/QueryFileManager.java | 2 +
.../db/query/control/QueryResourceManager.java | 3 +
.../iotdb/db/query/control/QueryTimeManager.java | 172 ++++
.../iotdb/db/query/control/TracingManager.java | 2 +-
.../apache/iotdb/db/query/dataset/ListDataSet.java | 12 +-
.../db/query/dataset/NonAlignEngineDataSet.java | 22 +
.../dataset/RawQueryDataSetWithoutValueFilter.java | 39 +-
.../apache/iotdb/db/query/dataset/ShowDataSet.java | 78 ++
.../iotdb/db/query/dataset/ShowDevicesDataSet.java | 58 ++
.../db/query/dataset/ShowTimeseriesDataSet.java | 46 +-
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 2 +-
.../dataset/groupby/GroupByEngineDataSet.java | 8 +-
.../groupby/GroupByWithValueFilterDataSet.java | 3 +-
.../db/query/executor/AggregationExecutor.java | 4 +-
.../db/query/executor/RawDataQueryExecutor.java | 4 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 14 +
.../org/apache/iotdb/db/query/udf/api/UDF.java | 15 +
.../org/apache/iotdb/db/query/udf/api/UDTF.java | 22 +-
.../db/query/udf/api/collector/PointCollector.java | 4 +-
.../api/customizer/config/UDTFConfigurations.java | 3 +-
.../parameter/UDFParameterValidator.java | 209 +++++
.../api/customizer/parameter/UDFParameters.java | 32 +
.../strategy/SlidingTimeWindowAccessStrategy.java | 2 +-
.../UDFAttributeNotProvidedException.java | 9 +-
.../udf/api/exception/UDFException.java} | 62 +-
.../UDFInputSeriesDataTypeNotValidException.java} | 34 +-
.../UDFInputSeriesIndexNotValidException.java} | 60 +-
.../UDFInputSeriesNumberNotValidException.java} | 17 +-
.../UDFParameterNotValidException.java} | 56 +-
.../db/query/udf/builtin/BuiltinFunction.java | 76 ++
.../iotdb/db/query/udf/builtin/UDTFAbs.java} | 148 ++--
.../udf/{api/UDF.java => builtin/UDTFAcos.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFAsin.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFAtan.java} | 57 +-
.../iotdb/db/query/udf/builtin/UDTFBottomK.java | 105 +++
.../udf/{api/UDF.java => builtin/UDTFCeil.java} | 57 +-
.../db/query/udf/builtin/UDTFCommonDerivative.java | 62 ++
.../udf/builtin/UDTFCommonValueDifference.java | 60 ++
.../iotdb/db/query/udf/builtin/UDTFContains.java} | 110 +--
.../udf/{api/UDF.java => builtin/UDTFCos.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFDegrees.java} | 57 +-
.../db/query/udf/builtin/UDTFDerivative.java} | 111 +--
.../udf/{api/UDF.java => builtin/UDTFExp.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFFloor.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFLog.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFLog10.java} | 57 +-
.../iotdb/db/query/udf/builtin/UDTFMatches.java} | 111 +--
.../iotdb/db/query/udf/builtin/UDTFMath.java | 89 ++
.../udf/builtin/UDTFNonNegativeDerivative.java | 63 ++
.../builtin/UDTFNonNegativeValueDifference.java | 61 ++
.../udf/{api/UDF.java => builtin/UDTFRadians.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFRound.java} | 57 +-
.../iotdb/db/query/udf/builtin/UDTFSelectK.java | 156 ++++
.../udf/{api/UDF.java => builtin/UDTFSign.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFSin.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFSqrt.java} | 57 +-
.../udf/{api/UDF.java => builtin/UDTFTan.java} | 57 +-
.../db/query/udf/builtin/UDTFTimeDifference.java} | 110 +--
.../iotdb/db/query/udf/builtin/UDTFTopK.java | 103 +++
.../db/query/udf/builtin/UDTFValueDifference.java} | 107 ++-
.../iotdb/db/query/udf/builtin/UDTFValueTrend.java | 73 ++
.../db/query/udf/core/executor/UDTFExecutor.java | 35 +-
.../udf/core/transformer/UDFQueryTransformer.java | 18 +-
.../query/udf/service/UDFClassLoaderManager.java | 9 +-
.../udf/service/UDFRegistrationInformation.java | 25 +-
.../query/udf/service/UDFRegistrationService.java | 145 +++-
.../apache/iotdb/db/rescon/MemTableManager.java | 116 +++
.../org/apache/iotdb/db/rescon/SystemInfo.java | 1 +
.../java/org/apache/iotdb/db/service/IoTDB.java | 9 +-
.../apache/iotdb/db/service/RegisterManager.java | 17 +-
.../org/apache/iotdb/db/service/ServiceType.java | 1 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 948 +++++++++------------
.../db/sync/receiver/load/FileLoaderManager.java | 6 +-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 41 +-
.../apache/iotdb/db/tools/IoTDBDataDirViewer.java | 4 +-
.../iotdb/db/tools/TsFileResourcePrinter.java | 11 +-
.../db/tools/watermark/WatermarkDetector.java | 2 +-
.../org/apache/iotdb/db/utils/FilePathUtils.java | 4 +
.../org/apache/iotdb/db/utils/SchemaUtils.java | 9 +-
.../apache/iotdb/db/utils/TypeInferenceUtils.java | 3 +
.../main/resources/iotdb/ui/static/iotdb-logo.png | Bin 1768 -> 1187 bytes
.../compaction/LevelCompactionMergeTest.java | 4 +-
.../compaction/LevelCompactionRecoverTest.java | 10 +-
.../LevelCompactionTsFileManagementTest.java | 1 -
.../NoCompactionTsFileManagementTest.java | 1 -
.../engine/storagegroup/TsFileProcessorTest.java | 14 +-
.../iotdb/db/integration/IOTDBGroupByIT.java | 1 -
.../iotdb/db/integration/IoTDBClearCacheIT.java | 2 +-
.../iotdb/db/integration/IoTDBDeletionIT.java | 35 +
.../apache/iotdb/db/integration/IoTDBFillIT.java | 6 +
.../db/integration/IoTDBFlushQueryMergeIT.java | 2 +-
.../integration/IoTDBGroupByFillWithRangeIT.java | 3 -
.../iotdb/db/integration/IoTDBInsertNaNIT.java | 46 +-
.../iotdb/db/integration/IoTDBKillQueryTest.java | 84 ++
.../apache/iotdb/db/integration/IoTDBLastIT.java | 1 -
.../db/integration/IoTDBLevelCompactionIT.java | 1 -
.../db/integration/IoTDBLoadExternalTsfileIT.java | 20 +-
.../iotdb/db/integration/IoTDBMergeTest.java | 5 +
.../iotdb/db/integration/IoTDBMultiSeriesIT.java | 26 +-
.../db/integration/IoTDBQueryTimeoutTest.java | 154 ++++
.../iotdb/db/integration/IoTDBRestartIT.java | 40 +-
.../db/integration/IoTDBRpcCompressionIT.java | 1 -
.../iotdb/db/integration/IoTDBSensorUpdateIT.java | 3 -
.../iotdb/db/integration/IoTDBSimpleQueryIT.java | 52 ++
.../iotdb/db/integration/IoTDBUDFManagementIT.java | 163 +++-
.../integration/IoTDBUDTFAlignByTimeQueryIT.java | 213 +++++
.../db/integration/IoTDBUDTFBuiltinFunctionIT.java | 250 ++++++
.../db/integration/IoTDBUDTFHybridQueryIT.java | 6 +-
.../aggregation/IoTDBAggregationIT.java | 24 +-
.../aggregation/IoTDBAggregationSmallDataIT.java | 4 +-
.../org/apache/iotdb/db/metadata/MTreeTest.java | 23 +
.../iotdb/db/monitor/IoTDBStatMonitorTest.java | 153 ++++
.../java/org/apache/iotdb/db/qp/PlannerTest.java | 59 +-
.../qp/{plan => logical}/IndexLogicalPlanTest.java | 2 +-
.../qp/{plan => logical}/LogicalPlanSmallTest.java | 3 +-
.../qp/{plan => physical}/ConcatOptimizerTest.java | 3 +-
.../IndexSubMatchingPhysicalPlanTest.java | 3 +-
.../IndexWholeMatchingPhysicalPlanTest.java | 3 +-
.../db/qp/physical/PhysicalPlanSerializeTest.java | 305 +++++++
.../db/qp/{plan => physical}/PhysicalPlanTest.java | 4 +-
.../qp/{plan => physical}/SerializationTest.java | 3 +-
.../db/qp/sql/DatetimeQueryDataSetUtilsTest.java | 142 ---
.../IoTDBsqlVisitorTest.java} | 4 +-
.../db/qp/utils/DatetimeQueryDataSetUtilsTest.java | 190 +++++
.../iotdb/db/query/control/TracingManagerTest.java | 15 +-
.../iotdb/db/query/dataset/ListDataSetTest.java | 2 +-
.../db/query/reader/series/SeriesReaderTest.java | 22 +-
.../iotdb/db/query/udf/example/Accumulator.java | 16 +-
.../apache/iotdb/db/query/udf/example/Adder.java | 19 +-
.../apache/iotdb/db/query/udf/example/Counter.java | 8 +-
...gSizeWindowConstructorTester1.java => Max.java} | 130 +--
.../iotdb/db/query/udf/example/Multiplier.java | 16 +-
.../SlidingSizeWindowConstructorTester0.java | 9 +-
.../SlidingSizeWindowConstructorTester1.java | 17 +-
.../SlidingTimeWindowConstructionTester.java | 17 +-
...onstructorTester1.java => TerminateTester.java} | 122 +--
.../{Multiplier.java => ValidateTester.java} | 100 +--
.../db/sync/receiver/load/FileLoaderTest.java | 8 +-
.../recover/SyncReceiverLogAnalyzerTest.java | 4 +-
.../org/apache/iotdb/db/tools/MLogParserTest.java | 35 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 13 +-
.../db/utils/datastructure/PrecisionTest.java | 22 +-
.../db/writelog/recover/DeviceStringTest.java | 8 +-
server/src/test/resources/iotdb-engine.properties | 2 +
server/src/test/resources/logback.xml | 1 +
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 9 +-
.../org/apache/iotdb/rpc/RpcTransportFactory.java | 14 +-
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 8 +-
.../rpc/TCompressedElasticFramedTransport.java | 45 +-
.../apache/iotdb/rpc/TElasticFramedTransport.java | 41 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 2 +-
.../rpc/TimeoutChangeableTFastFramedTransport.java | 11 +-
.../main/java/org/apache/iotdb/session/Config.java | 13 +-
.../java/org/apache/iotdb/session/Session.java | 176 +++-
.../apache/iotdb/session/SessionConnection.java | 34 +-
.../org/apache/iotdb/session/SessionDataSet.java | 12 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 81 ++
.../iotdb/session/IoTDBSessionComplexIT.java | 3 +-
.../iotdb/session/IoTDBSessionIteratorIT.java | 29 +
.../apache/iotdb/session/IoTDBSessionSimpleIT.java | 130 +++
.../public/img/contributor-avatar/jlq.png | Bin 122303 -> 98500 bytes
.../public/img/contributor-avatar/kfx.jpeg | Bin 134257 -> 119029 bytes
.../public/img/contributor-avatar/xdh.jpg | Bin 136069 -> 123718 bytes
.../.vuepress/public/img/contributor-avatar/yt.jpg | Bin 131028 -> 115804 bytes
.../public/img/contributor-avatar/zss.jpg | Bin 98085 -> 91455 bytes
site/src/main/.vuepress/public/img/home-Slide1.png | Bin 438294 -> 323108 bytes
site/src/main/.vuepress/public/img/home-Slide2.png | Bin 440893 -> 323620 bytes
site/src/main/.vuepress/public/img/home-Slide3.png | Bin 441335 -> 324332 bytes
site/src/main/.vuepress/public/img/home-icon2.png | Bin 5529 -> 4463 bytes
site/src/main/.vuepress/public/img/home-icon3.png | Bin 20637 -> 10753 bytes
site/src/main/.vuepress/public/img/home-icon4.png | Bin 13225 -> 8681 bytes
site/src/main/.vuepress/public/img/home-icon5.png | Bin 1989 -> 1531 bytes
site/src/main/.vuepress/public/img/home-icon6.png | Bin 16502 -> 11537 bytes
site/src/main/.vuepress/public/img/logo.png | Bin 31747 -> 21687 bytes
site/src/main/.vuepress/public/img/tools.jpg | Bin 347602 -> 294103 bytes
spark-iotdb-connector/Readme.md | 32 +-
spark-tsfile/README.md | 31 +-
.../e2e/base/docker-compose.yaml | 54 +-
test/e2e/cases/README.md | 53 ++
.../e2e/cases/cli/README.md | 20 +-
.../e2e/cases/cli/cleanup.sh | 30 +-
.../e2e/cases/cli/docker-compose.yaml | 44 +-
test/e2e/cases/cli/res/init.sql | 26 +
.../Dockerfile-0.11.0 => test/e2e/cases/cli/run.sh | 51 +-
thrift/src/main/thrift/cluster.thrift | 7 +
thrift/src/main/thrift/rpc.thrift | 15 +
.../tsfile/encoding/bitpacking/IntPacker.java | 4 +-
.../tsfile/encoding/bitpacking/LongPacker.java | 4 +-
.../exception/QueryTimeoutRuntimeException.java | 68 +-
.../write/UnSupportedDataTypeException.java | 4 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 195 +++--
.../iotdb/tsfile/read/reader/LocalTsFileInput.java | 20 +
.../org/apache/iotdb/tsfile/utils/BytesUtils.java | 4 +-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 13 +-
.../org/apache/iotdb/tsfile/read/ReadTest.java | 22 +-
.../iotdb/tsfile/read/TimePlainEncodeReadTest.java | 5 -
.../tsfile/read/TimeSeriesMetadataReadTest.java | 87 ++
.../read/query/executor/QueryExecutorTest.java | 1 -
.../apache/iotdb/tsfile/utils/FileGenerator.java | 22 +-
.../tsfile/write/DefaultDeviceTemplateTest.java | 110 +++
zeppelin-interpreter/pom.xml | 2 +-
.../apache/zeppelin/iotdb/IoTDBInterpreter.java | 212 +++--
.../zeppelin/iotdb/IoTDBInterpreterTest.java | 108 ++-
417 files changed, 12524 insertions(+), 5187 deletions(-)
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index bcde426,39994bb..5266f4f
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@@ -299,10 -310,11 +312,11 @@@ public class ClusterMain
public int calculateSlotByPartitionNum(String storageGroupName, long partitionId,
int maxSlotNum) {
int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
- if (sgSerialNum > 0) {
- return maxSlotNum / k * sgSerialNum;
+ if (sgSerialNum >= 0) {
+ return (int)(maxSlotNum / k * (sgSerialNum + 0.5));
} else {
- return defaultStrategy.calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum);
+ return defaultStrategy
+ .calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum);
}
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 0000000,4e15c6f..dbfb9ff
mode 000000,100644..100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@@ -1,0 -1,598 +1,602 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.iotdb.cluster.coordinator;
+
+ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+ import org.apache.iotdb.cluster.config.ClusterDescriptor;
+ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
++import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+ import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+ import org.apache.iotdb.cluster.metadata.CMManager;
+ import org.apache.iotdb.cluster.partition.PartitionGroup;
+ import org.apache.iotdb.cluster.query.ClusterPlanRouter;
+ import org.apache.iotdb.cluster.rpc.thrift.Node;
++import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+ import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+ import org.apache.iotdb.cluster.server.RaftServer;
+ import org.apache.iotdb.cluster.server.monitor.Timer;
+ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+ import org.apache.iotdb.cluster.utils.PartitionUtils;
+ import org.apache.iotdb.cluster.utils.StatusUtils;
+ import org.apache.iotdb.db.conf.IoTDBConstant;
+ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+ import org.apache.iotdb.db.exception.query.QueryProcessException;
+ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+ import org.apache.iotdb.db.service.IoTDB;
+ import org.apache.iotdb.rpc.RpcUtils;
+ import org.apache.iotdb.rpc.TSStatusCode;
+ import org.apache.iotdb.service.rpc.thrift.EndPoint;
+ import org.apache.iotdb.service.rpc.thrift.TSStatus;
+ import org.apache.thrift.TException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.List;
+ import java.util.Map;
+
+ /**
+ * Coordinator of client non-query request
+ */
+ public class Coordinator {
+
+ private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
+
+ private MetaGroupMember metaGroupMember;
+
+ private String name;
+ private Node thisNode;
+ /**
+ * router calculates the partition groups that a partitioned plan should be sent to
+ */
+ private ClusterPlanRouter router;
+
+ private static final String MSG_MULTIPLE_ERROR = "The following errors occurred when executing "
+ + "the query, please retry or contact the DBA: ";
+
+ public Coordinator(MetaGroupMember metaGroupMember) {
+ this.metaGroupMember = metaGroupMember;
+ this.name = metaGroupMember.getName();
+ this.thisNode = metaGroupMember.getThisNode();
+ }
+
+ public Coordinator() {
+
+ }
+
+ public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
+ this.metaGroupMember = metaGroupMember;
+ this.name = metaGroupMember.getName();
+ this.thisNode = metaGroupMember.getThisNode();
+ }
+
+ public void setRouter(ClusterPlanRouter router) {
+ this.router = router;
+ }
+
+ /**
+ * Execute a non-query plan. According to the type of the plan, the plan will be executed on all
+ * nodes (like timeseries deletion) or the nodes that belong to certain groups (like data
+ * ingestion).
+ *
+ * @param plan a non-query plan.
+ */
+ public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ TSStatus result;
+ long startTime = Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.getOperationStartTime();
+ if (PartitionUtils.isLocalNonQueryPlan(plan)) {
+ // run locally
+ result = executeNonQueryLocally(plan);
+ } else if (PartitionUtils.isGlobalMetaPlan(plan)) {
+ //forward the plan to all meta group nodes
+ result = metaGroupMember.processNonPartitionedMetaPlan(plan);
+ } else if (PartitionUtils.isGlobalDataPlan(plan)) {
+ //forward the plan to all data group nodes
+ result = processNonPartitionedDataPlan(plan);
+ } else {
+ //split the plan and forward them to some PartitionGroups
+ try {
+ result = processPartitionedPlan(plan);
+ } catch (UnsupportedPlanException e) {
+ return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION, e.getMessage());
+ }
+ }
+ Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
+ return result;
+ }
+
+ /**
+ * execute a non-query plan that is not necessary to be executed on other nodes.
+ */
+ private TSStatus executeNonQueryLocally(PhysicalPlan plan) {
+ boolean execRet;
+ try {
+ execRet = metaGroupMember.getLocalExecutor().processNonQuery(plan);
+ } catch (QueryProcessException e) {
+ if (e.getErrorCode() != TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
+ logger.debug("meet error while processing non-query. ", e);
+ } else {
+ logger.warn("meet error while processing non-query. ", e);
+ }
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ } catch (Exception e) {
+ logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ return execRet
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
+ : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+
+ /**
+ * A non-partitioned plan (like DeleteData) should be executed on all data group nodes, so the
+ * DataGroupLeader should take the responsible to make sure that every node receives the plan.
+ * Thus the plan will be processed locally only by the DataGroupLeader and forwarded by non-leader
+ * nodes.
+ */
+ private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+ if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) {
+ try {
+ // as delete related plans may have abstract paths (paths with wildcards), we convert
+ // them to full paths so the executor nodes will not need to query the metadata holders,
+ // eliminating the risk that when they are querying the metadata holders, the timeseries
+ // has already been deleted
+ ((CMManager) IoTDB.metaManager).convertToFullPaths(plan);
+ } catch (PathNotExistException e) {
+ if (plan.getPaths().isEmpty()) {
+ // only reports an error when there is no matching path
+ return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage());
+ }
+ }
+ }
+ try {
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+ logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
+ return forwardPlan(globalGroups, plan);
+ } catch (CheckConsistencyException e) {
+ logger.debug("Forwarding global data plan {} to meta leader {}", plan, metaGroupMember.getLeader());
+ metaGroupMember.waitLeader();
+ return metaGroupMember.forwardPlan(plan, metaGroupMember.getLeader(), null);
+ }
+ }
+
+
+ /**
+ * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
+ * a data group. And these sub-plans will be sent to and executed on the corresponding groups
+ * separately.
+ */
+ public TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException {
+ logger.debug("{}: Received a partitioned plan {}", name, plan);
+ if (metaGroupMember.getPartitionTable() == null) {
+ logger.debug("{}: Partition table is not ready", name);
+ return StatusUtils.PARTITION_TABLE_NOT_READY;
+ }
+
+ // split the plan into sub-plans that each only involve one data group
+ Map<PhysicalPlan, PartitionGroup> planGroupMap;
+ try {
+ planGroupMap = splitPlan(plan);
+ } catch (CheckConsistencyException checkConsistencyException) {
+ return StatusUtils
+ .getStatus(StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage());
+ }
+
+ // the storage group is not found locally
+ if (planGroupMap == null || planGroupMap.isEmpty()) {
+ if ((plan instanceof InsertPlan
+ || plan instanceof CreateTimeSeriesPlan
+ || plan instanceof CreateMultiTimeSeriesPlan)
+ && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
+ try {
+ ((CMManager) IoTDB.metaManager).createSchema(plan);
+ return processPartitionedPlan(plan);
+ } catch (MetadataException | CheckConsistencyException e) {
+ logger.error(
+ String.format("Failed to set storage group or create timeseries, because %s", e));
+ }
+ }
+ logger.error("{}: Cannot find storage groups for {}", name, plan);
+ return StatusUtils.NO_STORAGE_GROUP;
+ }
+ logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap);
+ return forwardPlan(planGroupMap, plan);
+ }
+
+ /**
+ * Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be
+ * returned. The error messages from each group (if any) will be compacted into one string.
+ *
+ * @param partitionGroups
+ * @param plan
+ */
+ private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan plan) {
+ // the error codes from the groups that cannot execute the plan
+ TSStatus status;
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ for (PartitionGroup partitionGroup : partitionGroups) {
+ if (partitionGroup.contains(thisNode)) {
+ // the query should be handled by a group the local node is in, handle it with in the group
+ logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader());
- status = metaGroupMember.getLocalDataMember(partitionGroup.getHeader())
- .executeNonQueryPlan(plan);
++ status = metaGroupMember.getLocalDataMember(new RaftNode(partitionGroup.getHeader(),
++ partitionGroup.getId())).executeNonQueryPlan(plan);
+ } else {
+ // forward the query to the group that should handle it
+ logger.debug("Forward {} to a remote group of {}", plan,
+ partitionGroup.getHeader());
+ status = forwardPlan(plan, partitionGroup);
+ }
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && (
+ !(plan instanceof DeleteTimeSeriesPlan) ||
+ status.getCode() != TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+ status.getCode(), partitionGroup.getHeader(),
+ status.getMessage()));
+ }
+ }
+ if (errorCodePartitionGroups.isEmpty()) {
+ status = StatusUtils.OK;
+ } else {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
+ return status;
+ }
+
+ /**
+ * split a plan into several sub-plans, each belongs to only one data group.
+ */
+ private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan)
+ throws UnsupportedPlanException, CheckConsistencyException {
+ Map<PhysicalPlan, PartitionGroup> planGroupMap = null;
+ try {
+ planGroupMap = router.splitAndRoutePlan(plan);
+ } catch (StorageGroupNotSetException e) {
+ // synchronize with the leader to see if this node has unpulled storage groups
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ try {
+ planGroupMap = router.splitAndRoutePlan(plan);
- } catch (MetadataException ex) {
++ } catch (MetadataException | UnknownLogTypeException ex) {
+ // ignore
+ }
- } catch (MetadataException e) {
++ } catch (MetadataException | UnknownLogTypeException e) {
+ logger.error("Cannot route plan {}", plan, e);
+ }
+ return planGroupMap;
+ }
+
+ /**
+ * Forward plans to the DataGroupMember of one node in the corresponding group. Only when all
+ * nodes time out, will a TIME_OUT be returned.
+ *
+ * @param planGroupMap sub-plan -> belong data group pairs
+ */
+ private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
+ // the error codes from the groups that cannot execute the plan
+ TSStatus status;
+ if (planGroupMap.size() == 1) {
+ status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+ } else {
+ if (plan instanceof InsertTabletPlan || plan instanceof CreateMultiTimeSeriesPlan) {
+ // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, each will correspond to a TSStatus as its
+ // execution result, as the plan is split and the sub-plans may have interleaving ranges,
+ // we must assure that each TSStatus is placed to the right position
+ // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
+ // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
+ // failure and success should be placed into proper positions in TSStatus.subStatus
+ status = forwardMultiSubPlan(planGroupMap, plan);
+ } else {
+ status = forwardToMultipleGroup(planGroupMap);
+ }
+ }
+ if (plan instanceof InsertPlan
+ && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+ && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
+ if (tmpStatus != null) {
+ status = tmpStatus;
+ }
+ }
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status
+ .isSetRedirectNode()) {
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ }
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
+ return status;
+ }
+
+ private TSStatus createTimeseriesForFailedInsertion(
+ Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+ // try to create timeseries
+ if (plan.getFailedMeasurements() != null) {
+ plan.getPlanFromFailed();
+ }
+ boolean hasCreate;
+ try {
+ hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+ } catch (IllegalPathException | CheckConsistencyException e) {
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ }
+ if (hasCreate) {
+ return forwardPlan(planGroupMap, plan);
+ } else {
+ logger.error("{}, Cannot auto create timeseries.", thisNode);
+ }
+ return null;
+ }
+
+
+ private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) {
+ TSStatus result;
+ if (entry.getValue().contains(thisNode)) {
+ // the query should be handled by a group the local node is in, handle it with in the group
+ long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .getOperationStartTime();
+ logger.debug("Execute {} in a local group of {}", entry.getKey(),
+ entry.getValue().getHeader());
- result = metaGroupMember.getLocalDataMember(entry.getValue().getHeader())
++ result = metaGroupMember.getLocalDataMember(new RaftNode(entry.getValue().getHeader(), entry.getValue().getId()))
+ .executeNonQueryPlan(entry.getKey());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ } else {
+ // forward the query to the group that should handle it
+ long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .getOperationStartTime();
+ logger.debug("Forward {} to a remote group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ result = forwardPlan(entry.getKey(), entry.getValue());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ }
+ return result;
+ }
+
+ /**
+ * forward each sub-plan to its corresponding data group, if some groups goes wrong, the error
+ * messages from each group will be compacted into one string.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ TSStatus tmpStatus;
+ boolean allRedirect = true;
+ EndPoint endPoint = null;
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+ tmpStatus = forwardToSingleGroup(entry);
+ if (tmpStatus.isSetRedirectNode()) {
+ endPoint = tmpStatus.getRedirectNode();
+ } else {
+ allRedirect = false;
+ }
+ if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+ tmpStatus.getCode(), entry.getValue().getHeader(),
+ tmpStatus.getMessage()));
+ }
+ }
+ TSStatus status;
+ if (errorCodePartitionGroups.isEmpty()) {
- status = StatusUtils.OK;
+ if (allRedirect) {
- status = StatusUtils.getStatus(status, endPoint);
++ status = new TSStatus();
++ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
++ } else {
++ status = StatusUtils.OK;
+ }
+ } else {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ return status;
+ }
+
+
+ /**
+ * Forward each sub-plan to its belonging data group, and combine responses from the groups.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ private TSStatus forwardMultiSubPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
+ PhysicalPlan parentPlan) {
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ TSStatus tmpStatus;
+ TSStatus[] subStatus = null;
+ boolean noFailure = true;
+ boolean isBatchFailure = false;
+ EndPoint endPoint = null;
+ int totalRowNum = 0;
+ // send sub-plans to each belonging data group and collect results
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+ tmpStatus = forwardToSingleGroup(entry);
+ logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus);
+ noFailure =
+ (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+ isBatchFailure = (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ || isBatchFailure;
+ if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (parentPlan instanceof InsertTabletPlan) {
+ totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size();
+ }
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ // set the status from one group to the proper positions of the overall status
+ if (parentPlan instanceof InsertTabletPlan) {
+ PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus,
+ tmpStatus.subStatus.toArray(new TSStatus[]{}));
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey();
+ for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+ subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i);
+ }
+ }
+ }
+ if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]",
+ tmpStatus.getCode(), entry.getValue().getHeader(),
+ tmpStatus.getMessage(), tmpStatus.subStatus));
+ }
+ if (parentPlan instanceof InsertTabletPlan && tmpStatus.isSetRedirectNode() &&
+ ((InsertTabletPlan) entry.getKey()).getMaxTime() == ((InsertTabletPlan) parentPlan)
+ .getMaxTime()) {
+ endPoint = tmpStatus.getRedirectNode();
+ }
+ }
+
+ if (parentPlan instanceof CreateMultiTimeSeriesPlan &&
+ !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ noFailure = false;
+ isBatchFailure = true;
+ for (Map.Entry<Integer, TSStatus> integerTSStatusEntry : ((CreateMultiTimeSeriesPlan) parentPlan)
+ .getResults().entrySet()) {
+ subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
+ }
+ }
+ return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus,
+ errorCodePartitionGroups);
+ }
+
+ private TSStatus concludeFinalStatus(boolean noFailure, EndPoint endPoint,
+ boolean isBatchFailure, TSStatus[] subStatus,
+ List<String> errorCodePartitionGroups) {
+ TSStatus status;
+ if (noFailure) {
+ status = StatusUtils.OK;
+ if (endPoint != null) {
+ status = StatusUtils.getStatus(status, endPoint);
+ }
+ } else if (isBatchFailure) {
+ status = RpcUtils.getStatus(Arrays.asList(subStatus));
+ } else {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ return status;
+ }
+
+
+ /**
+ * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out,
+ * will a TIME_OUT be returned.
+ */
+ private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
+ for (Node node : group) {
+ TSStatus status;
+ try {
+ // only data plans are partitioned, so it must be processed by its data server instead of
+ // meta server
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ status = forwardDataPlanAsync(plan, node, group.getHeader());
+ } else {
+ status = forwardDataPlanSync(plan, node, group.getHeader());
+ }
+ } catch (IOException e) {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ }
+ if (!StatusUtils.TIME_OUT.equals(status)) {
+ if (!status.isSetRedirectNode()) {
+ status.setRedirectNode(new EndPoint(node.getIp(), node.getClientPort()));
+ }
+ return status;
+ } else {
+ logger.warn("Forward {} to {} timed out", plan, node);
+ }
+ }
+ logger.warn("Forward {} to {} timed out", plan, group);
+ return StatusUtils.TIME_OUT;
+ }
+
+ /**
+ * Forward a non-query plan to the data port of "receiver"
+ *
+ * @param plan a non-query plan
+ * @param header to determine which DataGroupMember of "receiver" will process the request.
+ * @return a TSStatus indicating if the forwarding is successful.
+ */
+ private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header)
+ throws IOException {
+ RaftService.AsyncClient client = metaGroupMember.getClientProvider().getAsyncDataClient(receiver,
+ RaftServer.getWriteOperationTimeoutMS());
+ return this.metaGroupMember.forwardPlanAsync(plan, receiver, header, client);
+ }
+
+ private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
+ throws IOException {
+ RaftService.Client client = null;
+ try {
+ client = metaGroupMember.getClientProvider().getSyncDataClient(receiver,
+ RaftServer.getWriteOperationTimeoutMS());
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client);
+ }
+
+ /**
+ * Get a thrift client that will connect to "node" using the data port.
+ *
+ * @param node the node to be connected
+ * @param timeout timeout threshold of connection
+ */
+ public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
+ return metaGroupMember.getClientProvider().getAsyncDataClient(node, timeout);
+ }
+
+ public Node getThisNode() {
+ return thisNode;
+ }
+
+ /**
+ * Get a thrift client that will connect to "node" using the data port.
+ *
+ * @param node the node to be connected
+ * @param timeout timeout threshold of connection
+ */
+ public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
+ return metaGroupMember.getClientProvider().getSyncDataClient(node, timeout);
+ }
+ }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index aaa03a4,d7dd5f9..705000f
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@@ -19,7 -19,6 +19,8 @@@
package org.apache.iotdb.cluster.log.applier;
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
++import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@@ -70,19 -60,11 +71,20 @@@ public class MetaLogApplier extends Bas
} else {
logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
}
- } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException e) {
- } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
++ } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException | UnsupportedPlanException e) {
logger.debug("Exception occurred when executing {}", log, e);
log.setException(e);
} finally {
log.setApplied(true);
}
}
+
- private void sendLogToAllDataGroups(Log log) throws ChangeMembershipException {
++ private void sendLogToAllDataGroups(Log log)
++ throws ChangeMembershipException, UnsupportedPlanException {
+ LogPlan plan = new LogPlan(log.serialize());
- TSStatus status = member.executeNonQueryPlan(plan);
++ TSStatus status = member.processPartitionedPlan(plan);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status));
+ }
+ }
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index 1e86e11,ff650e3..c92a482
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@@ -85,19 -81,4 +85,20 @@@ public class MetaSingleSnapshotLogManag
snapshot.setLastLogTerm(term);
return snapshot;
}
+
+ @Override
+ void applyEntries(List<Log> entries) {
+ for (Log entry : entries) {
+ if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) {
+ blockedUnappliedLogList.add(entry);
+ continue;
+ }
+ try {
+ ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER);
+ } catch (Exception e) {
++ logger.error("Can not apply log {}", entry, e);
+ entry.setException(e);
+ }
+ }
+ }
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index c57ee06,c27cf9e..f774567
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@@ -114,9 -114,9 +114,9 @@@ public abstract class RaftLogManager
* Each time new logs are appended, this condition will be notified so logs that have larger
* indices but arrived earlier can proceed.
*/
- private final Object logUpdateCondition = new Object();
+ private final Object[] logUpdateConditions = new Object[1024];
- private List<Log> blockedUnappliedLogList;
+ protected List<Log> blockedUnappliedLogList;
protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name) {
this.logApplier = applier;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index c1ab3e5,dd63e2f..fe9b8f7
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@@ -303,15 -306,13 +307,13 @@@ public class ClientServer extends TSSer
GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
try {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- AsyncDataClient client = metaGroupMember
- .getClientProvider().getAsyncDataClient(queriedNode,
+ AsyncDataClient client = coordinator.getAsyncDataClient(queriedNode,
- RaftServer.getReadOperationTimeoutMS());
- client.endQuery(header, coordinator.getThisNode(), queryId, handler);
+ RaftServer.getReadOperationTimeoutMS());
- client.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId, handler);
++ client.endQuery(header.getNode(), header.getRaftId(), coordinator.getThisNode(), queryId, handler);
} else {
- SyncDataClient syncDataClient = metaGroupMember
- .getClientProvider().getSyncDataClient(queriedNode,
+ SyncDataClient syncDataClient = coordinator.getSyncDataClient(queriedNode,
- RaftServer.getReadOperationTimeoutMS());
- syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+ RaftServer.getReadOperationTimeoutMS());
- syncDataClient.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId);
++ syncDataClient.endQuery(header.getNode(), header.getRaftId(), coordinator.getThisNode(), queryId);
}
} catch (IOException | TException e) {
logger.error("Cannot end query {} in {}", queryId, queriedNode);
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index d198039,8df621d..f97d181
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@@ -339,8 -344,18 +344,18 @@@ public class MetaClusterServer extends
}
@Override
- public void removeHardLink(String hardLinkPath,
+ public void removeHardLink(String hardLinkPath, int raftId,
AsyncMethodCallback<Void> resultHandler) {
- asyncService.removeHardLink(hardLinkPath, resultHandler);
+ asyncService.removeHardLink(hardLinkPath, raftId, resultHandler);
}
+
+ @Override
+ public void handshake(Node sender) {
+ syncService.handshake(sender);
+ }
+
+ @Override
+ public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+ asyncService.handshake(sender, resultHandler);
+ }
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index df45b53,f8811e9..7b4f663
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@@ -75,15 -75,15 +75,16 @@@ import org.apache.iotdb.cluster.rpc.thr
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.server.NodeCharacter;
- import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
- import org.apache.iotdb.cluster.server.Peer;
+ import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
+ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+ import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.PullSnapshotHintService;
import org.apache.iotdb.cluster.server.Response;
- import org.apache.iotdb.cluster.server.Timer;
- import org.apache.iotdb.cluster.server.Timer.Statistic;
+ import org.apache.iotdb.cluster.server.monitor.Timer;
+ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@@ -824,8 -768,8 +825,8 @@@ public class DataGroupMember extends Ra
lastReportedLogIndex = logManager.getLastLogIndex();
return new DataMemberReport(character, leader.get(), term.get(),
logManager.getLastLogTerm(), lastReportedLogIndex, logManager.getCommitLogIndex(),
- logManager.getCommitLogTerm(), getHeader(), readOnly,
+ logManager.getCommitLogTerm(), getHeader(), getRaftGroupId(), readOnly,
- QueryCoordinator.getINSTANCE()
+ NodeStatusManager.getINSTANCE()
.getLastResponseLatency(getHeader()), lastHeartbeatReceivedTime, prevLastLogIndex,
logManager.getMaxHaveAppliedCommitIndex());
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 1534323,74b9d3b..5484bb5
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -19,42 -19,6 +19,7 @@@
package org.apache.iotdb.cluster.server.member;
- import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
- import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
-
- import java.io.BufferedInputStream;
- import java.io.BufferedOutputStream;
- import java.io.BufferedReader;
- import java.io.BufferedWriter;
- import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.FileOutputStream;
- import java.io.FileReader;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.file.Files;
- import java.nio.file.Paths;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
+import java.util.Map.Entry;
- import java.util.Objects;
- import java.util.Set;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.client.DataClientProvider;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@@ -65,17 -29,15 +30,17 @@@ import org.apache.iotdb.cluster.client.
import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+ import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.AddSelfException;
+import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.EmptyIntervalException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
-import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
- import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
@@@ -89,9 -50,8 +54,7 @@@ import org.apache.iotdb.cluster.partiti
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.query.ClusterPlanRouter;
- import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
--import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
@@@ -107,12 -64,9 +70,8 @@@ import org.apache.iotdb.cluster.server.
import org.apache.iotdb.cluster.server.DataClusterServer;
import org.apache.iotdb.cluster.server.HardLinkCleaner;
import org.apache.iotdb.cluster.server.NodeCharacter;
- import org.apache.iotdb.cluster.server.NodeReport;
- import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
- import org.apache.iotdb.cluster.server.Timer;
--import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
@@@ -155,6 -101,42 +117,40 @@@ import org.apache.thrift.transport.TTra
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+ import java.io.BufferedInputStream;
+ import java.io.BufferedOutputStream;
+ import java.io.BufferedReader;
+ import java.io.BufferedWriter;
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+ import java.io.FileReader;
+ import java.io.FileWriter;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.nio.file.Files;
+ import java.nio.file.Paths;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+
+ import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
+ import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
+
@SuppressWarnings("java:S1135")
public class MetaGroupMember extends RaftMember {
@@@ -172,9 -154,7 +168,10 @@@
* in case of data loss, some file changes would be made to a temporary file first
*/
private static final String TEMP_SUFFIX = ".tmp";
+
+ private static final String MSG_MULTIPLE_ERROR = "The following errors occurred when executing "
+ + "the query, please retry or contact the DBA: ";
+
private static final Logger logger = LoggerFactory.getLogger(MetaGroupMember.class);
/**
* when joining a cluster this node will retry at most "DEFAULT_JOIN_RETRY" times before returning
@@@ -265,12 -264,12 +276,12 @@@
public MetaGroupMember() {
}
- public MetaGroupMember(TProtocolFactory factory, Node thisNode) throws QueryProcessException {
+ public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator coordinator) throws QueryProcessException {
super("Meta", new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)),
new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
- new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
+ new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
- allNodes = new ArrayList<>();
+ allNodes = new PartitionGroup();
initPeerMap();
dataClientProvider = new DataClientProvider(factory);
@@@ -1308,415 -1402,6 +1319,415 @@@
}
/**
+ * A non-partitioned plan (like DeleteData) should be executed on all data group nodes, so the
+ * DataGroupLeader should take the responsible to make sure that every node receives the plan.
+ * Thus the plan will be processed locally only by the DataGroupLeader and forwarded by non-leader
+ * nodes.
+ */
+ private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+ if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) {
+ try {
+ // as delete related plans may have abstract paths (paths with wildcards), we convert
+ // them to full paths so the executor nodes will not need to query the metadata holders,
+ // eliminating the risk that when they are querying the metadata holders, the timeseries
+ // has already been deleted
+ ((CMManager) IoTDB.metaManager).convertToFullPaths(plan);
+ } catch (PathNotExistException e) {
+ if (plan.getPaths().isEmpty()) {
+ // only reports an error when there is no matching path
+ return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage());
+ }
+ }
+ }
+ try {
+ syncLeaderWithConsistencyCheck(true);
+ List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
+ logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
+ return forwardPlan(globalGroups, plan);
+ } catch (CheckConsistencyException e) {
+ logger.debug("Forwarding global data plan {} to meta leader {}", plan, leader.get());
+ waitLeader();
+ return forwardPlan(plan, leader.get(), null);
+ }
+ }
+
+ /**
+ * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
+ * a data group. And these sub-plans will be sent to and executed on the corresponding groups
+ * separately.
+ */
+ public TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException {
+ logger.debug("{}: Received a partitioned plan {}", name, plan);
+ if (partitionTable == null) {
+ logger.debug("{}: Partition table is not ready", name);
+ return StatusUtils.PARTITION_TABLE_NOT_READY;
+ }
+
+ // split the plan into sub-plans that each only involve one data group
+ Map<PhysicalPlan, PartitionGroup> planGroupMap;
+ try {
+ planGroupMap = splitPlan(plan);
+ } catch (CheckConsistencyException checkConsistencyException) {
+ return StatusUtils
+ .getStatus(StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage());
+ }
+
+ // the storage group is not found locally
+ if (planGroupMap == null || planGroupMap.isEmpty()) {
+ if ((plan instanceof InsertPlan || plan instanceof CreateTimeSeriesPlan
+ || plan instanceof CreateMultiTimeSeriesPlan)
+ && config.isEnableAutoCreateSchema()) {
+ logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
+ try {
+ ((CMManager) IoTDB.metaManager).createSchema(plan);
+ return processPartitionedPlan(plan);
+ } catch (MetadataException | CheckConsistencyException e) {
+ logger.error(
+ String.format("Failed to set storage group or create timeseries, because %s", e));
+ }
+ }
+ logger.error("{}: Cannot find storage groups for {}", name, plan);
+ return StatusUtils.NO_STORAGE_GROUP;
+ }
+ logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap);
+ return forwardPlan(planGroupMap, plan);
+ }
+
+ /**
+ * split a plan into several sub-plans, each belongs to only one data group.
+ */
+ private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan)
+ throws UnsupportedPlanException, CheckConsistencyException {
+ Map<PhysicalPlan, PartitionGroup> planGroupMap = null;
+ try {
+ planGroupMap = router.splitAndRoutePlan(plan);
+ } catch (StorageGroupNotSetException e) {
+ // synchronize with the leader to see if this node has unpulled storage groups
+ syncLeaderWithConsistencyCheck(true);
+ try {
+ planGroupMap = router.splitAndRoutePlan(plan);
+ } catch (MetadataException | UnknownLogTypeException ex) {
+ // ignore
+ }
+ } catch (MetadataException | UnknownLogTypeException e) {
+ logger.error("Cannot route plan {}", plan, e);
+ }
+ return planGroupMap;
+ }
+
+ /**
+ * Forward plans to the DataGroupMember of one node in the corresponding group. Only when all
+ * nodes time out, will a TIME_OUT be returned.
+ *
+ * @param planGroupMap sub-plan -> belong data group pairs
+ */
+ public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
+ // the error codes from the groups that cannot execute the plan
+ TSStatus status;
+ if (planGroupMap.size() == 1) {
+ status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+ } else {
+ if (plan instanceof InsertTabletPlan || plan instanceof CreateMultiTimeSeriesPlan) {
+ // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, each will correspond to a TSStatus as its
+ // execution result, as the plan is split and the sub-plans may have interleaving ranges,
+ // we must assure that each TSStatus is placed to the right position
+ // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
+ // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
+ // failure and success should be placed into proper positions in TSStatus.subStatus
+ status = forwardMultiSubPlan(planGroupMap, plan);
+ } else {
+ status = forwardToMultipleGroup(planGroupMap);
+ }
+ }
+ if (plan instanceof InsertPlan
+ && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+ && config.isEnableAutoCreateSchema()) {
+ TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
+ if (tmpStatus != null) {
+ status = tmpStatus;
+ }
+ }
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status
- .isSetRedirectNode()) {
- status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- }
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
+ return status;
+ }
+
+ private TSStatus createTimeseriesForFailedInsertion(
+ Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+ // try to create timeseries
+ if (plan.getFailedMeasurements() != null) {
+ plan.getPlanFromFailed();
+ }
+ boolean hasCreate;
+ try {
+ hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+ } catch (IllegalPathException | CheckConsistencyException e) {
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ }
+ if (hasCreate) {
+ return forwardPlan(planGroupMap, plan);
+ } else {
+ logger.error("{}, Cannot auto create timeseries.", thisNode);
+ }
+ return null;
+ }
+
+ /**
+ * Forward each sub-plan to its belonging data group, and combine responses from the groups.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ private TSStatus forwardMultiSubPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
+ PhysicalPlan parentPlan) {
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ TSStatus tmpStatus;
+ TSStatus[] subStatus = null;
+ boolean noFailure = true;
+ boolean isBatchFailure = false;
+ EndPoint endPoint = null;
+ int totalRowNum = 0;
+ // send sub-plans to each belonging data group and collect results
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+ tmpStatus = forwardToSingleGroup(entry);
+ logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus);
+ noFailure =
+ (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+ isBatchFailure = (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ || isBatchFailure;
+ if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (parentPlan instanceof InsertTabletPlan) {
+ totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size();
+ }
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ // set the status from one group to the proper positions of the overall status
+ if (parentPlan instanceof InsertTabletPlan) {
+ PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus,
+ tmpStatus.subStatus.toArray(new TSStatus[]{}));
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey();
+ for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+ subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i);
+ }
+ }
+ }
+ if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]",
+ tmpStatus.getCode(), entry.getValue().getHeader(),
+ tmpStatus.getMessage(), tmpStatus.subStatus));
+ }
+ if (parentPlan instanceof InsertTabletPlan && tmpStatus.isSetRedirectNode() &&
+ ((InsertTabletPlan) entry.getKey()).getMaxTime() == ((InsertTabletPlan) parentPlan)
+ .getMaxTime()) {
+ endPoint = tmpStatus.getRedirectNode();
+ }
+ }
+
+ if (parentPlan instanceof CreateMultiTimeSeriesPlan &&
+ !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ noFailure = false;
+ isBatchFailure = true;
+ for (Entry<Integer, TSStatus> integerTSStatusEntry : ((CreateMultiTimeSeriesPlan) parentPlan)
+ .getResults().entrySet()) {
+ subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
+ }
+ }
+ return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus,
+ errorCodePartitionGroups);
+ }
+
+ private TSStatus concludeFinalStatus(boolean noFailure, EndPoint endPoint,
+ boolean isBatchFailure, TSStatus[] subStatus, List<String> errorCodePartitionGroups) {
+ TSStatus status;
+ if (noFailure) {
+ status = StatusUtils.OK;
+ if (endPoint != null) {
+ status = StatusUtils.getStatus(status, endPoint);
+ }
+ } else if (isBatchFailure) {
+ status = RpcUtils.getStatus(Arrays.asList(subStatus));
+ } else {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ return status;
+ }
+
+ private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) {
+ TSStatus result;
+ if (entry.getValue().contains(thisNode)) {
+ // the query should be handled by a group the local node is in, handle it with in the group
+ long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .getOperationStartTime();
+ logger.debug("Execute {} in a local group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ result = getLocalDataMember(entry.getValue().getHeader(), entry.getValue().getId())
+ .executeNonQueryPlan(entry.getKey());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ } else {
+ // forward the query to the group that should handle it
+ long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .getOperationStartTime();
+ logger.debug("Forward {} to a remote group of {}", entry.getKey(),
+ entry.getValue().getHeader());
+ result = forwardPlan(entry.getKey(), entry.getValue());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ }
+ return result;
+ }
+
+ /**
+ * forward each sub-plan to its corresponding data group, if some groups goes wrong, the error
+ * messages from each group will be compacted into one string.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ TSStatus tmpStatus;
+ boolean allRedirect = true;
+ EndPoint endPoint = null;
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+ tmpStatus = forwardToSingleGroup(entry);
+ if (tmpStatus.isSetRedirectNode()) {
+ endPoint = tmpStatus.getRedirectNode();
+ } else {
+ allRedirect = false;
+ }
+ if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+ tmpStatus.getCode(), entry.getValue().getHeader(),
+ tmpStatus.getMessage()));
+ }
+ }
+ TSStatus status;
+ if (errorCodePartitionGroups.isEmpty()) {
- status = StatusUtils.OK;
+ if (allRedirect) {
- status = StatusUtils.getStatus(status, endPoint);
++ status = new TSStatus();
++ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
++ } else {
++ status = StatusUtils.OK;
+ }
+ } else {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ return status;
+ }
+
+ /**
+ * Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be
+ * returned. The error messages from each group (if any) will be compacted into one string.
+ *
+ * @para plan
+ */
+ private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan plan) {
+ // the error codes from the groups that cannot execute the plan
+ TSStatus status;
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ for (PartitionGroup partitionGroup : partitionGroups) {
+ if (partitionGroup.contains(thisNode)) {
+ // the query should be handled by a group the local node is in, handle it with in the group
+ logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader());
+ status = getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
+ .executeNonQueryPlan(plan);
+ } else {
+ // forward the query to the group that should handle it
+ logger.debug("Forward {} to a remote group of {}", plan,
+ partitionGroup.getHeader());
+ status = forwardPlan(plan, partitionGroup);
+ }
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && (
+ !(plan instanceof DeleteTimeSeriesPlan) ||
+ status.getCode() != TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+ status.getCode(), partitionGroup.getHeader(),
+ status.getMessage()));
+ }
+ }
+ if (errorCodePartitionGroups.isEmpty()) {
+ status = StatusUtils.OK;
+ } else {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
+ return status;
+ }
+
+ /**
+ * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out,
+ * will a TIME_OUT be returned.
+ */
+ private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
+ for (Node node : group) {
+ TSStatus status;
+ try {
+ // only data plans are partitioned, so it must be processed by its data server instead of
+ // meta server
+ if (config.isUseAsyncServer()) {
+ status = forwardDataPlanAsync(plan, node, group.getHeader());
+ } else {
+ status = forwardDataPlanSync(plan, node, group.getHeader());
+ }
+ } catch (IOException e) {
+ status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ }
+ if (!StatusUtils.TIME_OUT.equals(status)) {
+ if (!status.isSetRedirectNode()) {
+ status.setRedirectNode(new EndPoint(node.getIp(), node.getClientPort()));
+ }
+ return status;
+ } else {
+ logger.warn("Forward {} to {} timed out", plan, node);
+ }
+ }
+ logger.warn("Forward {} to {} timed out", plan, group);
+ return StatusUtils.TIME_OUT;
+ }
+
+ /**
+ * Forward a non-query plan to the data port of "receiver"
+ *
+ * @param plan a non-query plan
+ * @param header to determine which DataGroupMember of "receiver" will process the request.
+ * @return a TSStatus indicating if the forwarding is successful.
+ */
+ private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header)
+ throws IOException {
+ RaftService.AsyncClient client = getClientProvider().getAsyncDataClient(receiver,
+ RaftServer.getWriteOperationTimeoutMS());
+ return forwardPlanAsync(plan, receiver, header, client);
+ }
+
+ private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
+ throws IOException {
+ Client client = null;
+ try {
+ client = getClientProvider().getSyncDataClient(receiver,
+ RaftServer.getWriteOperationTimeoutMS());
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ return forwardPlanSync(plan, receiver, header, client);
+ }
+
+ /**
++=======
++>>>>>>> master
* Get the data groups that should be queried when querying "path" with "filter". First, the time
* interval qualified by the filter will be extracted. If any side of the interval is open, query
* all groups. Otherwise compute all involved groups w.r.t. the time partitioning.
@@@ -2096,7 -1764,7 +2108,11 @@@
this.dataClientProvider = dataClientProvider;
}
+ public void setRouter(ClusterPlanRouter router) {
+ this.router = router;
+ }
++
+ public void handleHandshake(Node sender) {
+ NodeStatusManager.getINSTANCE().activate(sender);
+ }
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 121892b,fc83ba4..d05ddd9
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@@ -85,6 -84,6 +85,7 @@@ import org.apache.iotdb.cluster.utils.C
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
++import org.apache.iotdb.cluster.utils.nodetool.function.Status;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@@ -932,6 -876,6 +907,7 @@@ public abstract class RaftMember
try {
if (appendLogInGroup(log)) {
++ TSStatus res = StatusUtils.OK;
return StatusUtils.OK;
}
} catch (LogExecutionException e) {
@@@ -1221,9 -1165,9 +1197,9 @@@
return forwardPlanAsync(plan, receiver, header, client);
}
- TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, Node header, AsyncClient client) {
+ public TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, Node header, AsyncClient client) {
try {
- TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, header, receiver);
+ TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, header, receiver, getRaftGroupId());
if (tsStatus == null) {
tsStatus = StatusUtils.TIME_OUT;
logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
@@@ -1249,10 -1193,9 +1225,10 @@@
return forwardPlanSync(plan, receiver, header, client);
}
- TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header, Client client) {
+ public TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header, Client client) {
try {
ExecutNonQueryReq req = new ExecutNonQueryReq();
+ req.setRaftId(getRaftGroupId());
req.setPlanBytes(PlanSerializer.getInstance().serialize(plan));
if (header != null) {
req.setHeader(header);
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 3b2df98,114aa5a..a555e17
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@@ -197,10 -195,14 +197,16 @@@ public class MetaAsyncService extends B
* @param resultHandler
*/
@Override
- public void exile(AsyncMethodCallback<Void> resultHandler) {
- metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+ public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) {
+ RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ removeNodeLog.deserialize(removeNodeLogBuffer);
+ metaGroupMember.applyRemoveNode(removeNodeLog);
resultHandler.onComplete(null);
}
+
+ @Override
+ public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+ metaGroupMember.handleHandshake(sender);
+ resultHandler.onComplete(null);
+ }
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 48c0e58,ec1519a..d88f4f5
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@@ -190,9 -188,12 +190,14 @@@ public class MetaSyncService extends Ba
* must tell it directly.
*/
@Override
- public void exile() {
- metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+ public void exile(ByteBuffer removeNodeLogBuffer) {
+ RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ removeNodeLog.deserialize(removeNodeLogBuffer);
+ metaGroupMember.applyRemoveNode(removeNodeLog);
}
+
+ @Override
+ public void handshake(Node sender) {
+ metaGroupMember.handleHandshake(sender);
+ }
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index a0e5f56,28922ad..890b402
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@@ -28,9 -27,8 +28,9 @@@ import org.apache.iotdb.cluster.partiti
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.MetaClusterServer;
- import org.apache.iotdb.cluster.server.Timer;
+ import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StartupException;
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index fbffb7f,6cc5418..65bb031
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@@ -988,70 -971,74 +993,74 @@@ public class DataGroupMemberTest extend
request.setTimeFilterBytes(SerializeUtils.serializeFilter(timeFilter));
QueryContext queryContext =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
- request.setQueryId(queryContext.getQueryId());
- request.setRequestor(TestUtils.getNode(0));
- request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
- request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0)));
- request.setAscending(true);
-
- DataGroupMember dataGroupMember;
- AtomicReference<Long> resultRef;
- GenericHandler<Long> handler;
- Long executorId;
- AtomicReference<List<ByteBuffer>> aggrResultRef;
- GenericHandler<List<ByteBuffer>> aggrResultHandler;
- List<ByteBuffer> byteBuffers;
- List<AggregateResult> aggregateResults;
- Object[] answers;
- // get an executor from a node holding this timeseries
- request.setHeader(TestUtils.getNode(10));
- dataGroupMember = getDataGroupMember(TestUtils.getNode(10));
try {
- resultRef = new AtomicReference<>();
- handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
- new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
- executorId = resultRef.get();
- assertEquals(1L, (long) executorId);
-
- // fetch result
- aggrResultRef = new AtomicReference<>();
- aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
- new DataAsyncService(dataGroupMember)
- .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler);
-
- byteBuffers = aggrResultRef.get();
- assertNotNull(byteBuffers);
- aggregateResults = new ArrayList<>();
- for (ByteBuffer byteBuffer : byteBuffers) {
- aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
+ request.setQueryId(queryContext.getQueryId());
+ request.setRequestor(TestUtils.getNode(0));
+ request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
+ request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0)));
+ request.setAscending(true);
+
+ DataGroupMember dataGroupMember;
+ AtomicReference<Long> resultRef;
+ GenericHandler<Long> handler;
+ Long executorId;
+ AtomicReference<List<ByteBuffer>> aggrResultRef;
+ GenericHandler<List<ByteBuffer>> aggrResultHandler;
+ List<ByteBuffer> byteBuffers;
+ List<AggregateResult> aggregateResults;
+ Object[] answers;
+ // get an executor from a node holding this timeseries
+ request.setHeader(TestUtils.getNode(10));
+ dataGroupMember = getDataGroupMember(TestUtils.getNode(10));
+ try {
+ resultRef = new AtomicReference<>();
+ handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
+ new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
+ executorId = resultRef.get();
+ assertEquals(1L, (long) executorId);
+
+ // fetch result
+ aggrResultRef = new AtomicReference<>();
+ aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+ new DataAsyncService(dataGroupMember)
- .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler);
++ .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler);
+
+ byteBuffers = aggrResultRef.get();
+ assertNotNull(byteBuffers);
+ aggregateResults = new ArrayList<>();
+ for (ByteBuffer byteBuffer : byteBuffers) {
+ aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
+ }
+ answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
+ checkAggregates(answers, aggregateResults);
+ } finally {
+ dataGroupMember.closeLogManager();
}
- answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
- checkAggregates(answers, aggregateResults);
- } finally {
- dataGroupMember.closeLogManager();
- }
- // get an executor from a node not holding this timeseries
- request.setHeader(TestUtils.getNode(30));
- dataGroupMember = getDataGroupMember(TestUtils.getNode(30));
- try {
- resultRef = new AtomicReference<>();
- handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
- request.timeFilterBytes.position(0);
- new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
- executorId = resultRef.get();
- assertEquals(-1L, (long) executorId);
-
- // fetch result
- aggrResultRef = new AtomicReference<>();
- aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
- new DataAsyncService(dataGroupMember)
- .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler);
-
- byteBuffers = aggrResultRef.get();
- assertNull(byteBuffers);
+ // get an executor from a node not holding this timeseries
+ request.setHeader(TestUtils.getNode(30));
+ dataGroupMember = getDataGroupMember(TestUtils.getNode(30));
+ try {
+ resultRef = new AtomicReference<>();
+ handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
+ request.timeFilterBytes.position(0);
+ new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
+ executorId = resultRef.get();
+ assertEquals(-1L, (long) executorId);
+
+ // fetch result
+ aggrResultRef = new AtomicReference<>();
+ aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+ new DataAsyncService(dataGroupMember)
- .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
++ .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler);
+
+ byteBuffers = aggrResultRef.get();
+ assertNull(byteBuffers);
+ } finally {
+ dataGroupMember.closeLogManager();
+ }
} finally {
- dataGroupMember.closeLogManager();
+ QueryResourceManager.getInstance().endQuery(queryContext.getQueryId());
}
}
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 8c1beef,e2a5767..d5a6515
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@@ -100,8 -100,11 +105,10 @@@ public class MemberTest
prevUseAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
preLogBufferSize = ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize();
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
- ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(4096);
testThreadPool = Executors.newFixedThreadPool(4);
prevLeaderWait = RaftMember.getWaitLeaderTimeMs();
+ prevEnableWAL = IoTDBDescriptor.getInstance().getConfig().isEnableWal();
+ IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
RaftMember.setWaitLeaderTimeMs(10);
allNodes = new PartitionGroup();
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 94f5067,fe0adfc..12fe37e
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@@ -73,10 -67,8 +74,9 @@@ import org.apache.iotdb.cluster.metadat
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.query.ClusterPlanRouter;
import org.apache.iotdb.cluster.query.LocalQueryExecutor;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
- import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@@ -173,6 -163,6 +174,13 @@@ public class MetaGroupMemberTest extend
RaftServer.setReadOperationTimeoutMS(1000);
super.setUp();
++ partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)) {
++ @Override
++ public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
++ return new RaftNode(TestUtils.getNode(0), 0);
++ }
++ };
++ testMetaMember.setPartitionTable(partitionTable);
dummyResponse.set(Response.RESPONSE_AGREE);
testMetaMember.setAllNodes(allNodes);
@@@ -186,14 -176,8 +194,14 @@@
buildDataGroups(dataClusterServer);
testMetaMember.getThisNode().setNodeIdentifier(0);
+ testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()){
+ @Override
+ protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) {
+ return Collections.singletonMap(plan, partitionTable.getHeaderGroup(testMetaMember.getThisNode()));
+ }
+ });
mockDataClusterServer = false;
- QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
exiledNode = null;
System.out.println("Init term of metaGroupMember: " + testMetaMember.getTerm().get());
}
@@@ -307,9 -286,8 +315,9 @@@
return resp;
}
+ @Override
protected MetaGroupMember getMetaGroupMember(Node node) throws QueryProcessException {
- MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node) {
+ MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node, new Coordinator()) {
@Override
public DataClusterServer getDataClusterServer() {
@@@ -893,7 -914,7 +949,8 @@@
public void testProcessValidHeartbeatReq() throws QueryProcessException {
System.out.println("Start testProcessValidHeartbeatReq()");
MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
+ partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
+ testMetaMember.setCoordinator(new Coordinator());
try {
HeartBeatRequest request = new HeartBeatRequest();
request.setRequireIdentifier(true);
diff --cc server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index b03f984,30d5d52..c9d130e
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@@ -500,10 -510,12 +510,10 @@@ public class StorageEngine implements I
* @throws StorageGroupNotSetException
*/
public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partitionId,
- boolean isSeq,
- boolean isSync)
- throws StorageGroupNotSetException {
+ boolean isSeq, boolean isSync) throws StorageGroupNotSetException {
StorageGroupProcessor processor = processorMap.get(storageGroupPath);
if (processor == null) {
- throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
+ throw new StorageGroupNotSetException(storageGroupPath.getFullPath(), true);
}
logger.info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
diff --cc server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index aa31a4e,e733813..8928110
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@@ -114,115 -115,65 +115,66 @@@ public class MLogWriter implements Auto
}
}
- public synchronized void createTimeseries(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException {
- try {
- putLog(createTimeSeriesPlan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
- }
+ public void createTimeseries(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException {
+ putLog(createTimeSeriesPlan);
}
- public synchronized void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
- try {
- putLog(deleteTimeSeriesPlan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
- }
+ public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
+ putLog(deleteTimeSeriesPlan);
}
- public synchronized void setStorageGroup(PartialPath storageGroup) throws IOException {
- try {
- SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
- }
+ public void setStorageGroup(PartialPath storageGroup) throws IOException {
+ SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
+ putLog(plan);
}
- public synchronized void deleteStorageGroup(PartialPath storageGroup) throws IOException {
- try {
- DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
- }
+ public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
+ DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
+ putLog(plan);
}
- public synchronized void setTTL(PartialPath storageGroup, long ttl) throws IOException {
- try {
- SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
- }
+ public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
+ SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
+ putLog(plan);
}
- public synchronized void changeOffset(PartialPath path, long offset) throws IOException {
- try {
- ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
- }
+ public void changeOffset(PartialPath path, long offset) throws IOException {
+ ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
+ putLog(plan);
}
- public synchronized void changeAlias(PartialPath path, String alias) throws IOException {
- try {
- ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
- }
+ public void changeAlias(PartialPath path, String alias) throws IOException {
+ ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
+ putLog(plan);
}
- public synchronized void serializeMNode(MNode node) throws IOException {
- try {
- int childSize = 0;
- if (node.getChildren() != null) {
- childSize = node.getChildren().size();
- }
- MNodePlan plan = new MNodePlan(node.getName(), childSize);
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
+ public void serializeMNode(MNode node) throws IOException {
+ int childSize = 0;
+ if (node.getChildren() != null) {
+ childSize = node.getChildren().size();
}
+ MNodePlan plan = new MNodePlan(node.getName(), childSize);
+ putLog(plan);
}
- public synchronized void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
- try {
- int childSize = 0;
- if (node.getChildren() != null) {
- childSize = node.getChildren().size();
- }
- MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
- node.getOffset(), childSize, node.getSchema());
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
++
+ public void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
+ int childSize = 0;
+ if (node.getChildren() != null) {
+ childSize = node.getChildren().size();
}
+ MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
+ node.getOffset(), childSize, node.getSchema());
+ putLog(plan);
}
- public synchronized void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
- try {
- int childSize = 0;
- if (node.getChildren() != null) {
- childSize = node.getChildren().size();
- }
- StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
- putLog(plan);
- } catch (BufferOverflowException e) {
- throw new IOException(
- LOG_TOO_LARGE_INFO, e);
+ public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
+ int childSize = 0;
+ if (node.getChildren() != null) {
+ childSize = node.getChildren().size();
}
+ StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
+ putLog(plan);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --cc server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 0d64b30,e9bf655..dc553be
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@@ -366,7 -361,8 +366,8 @@@ public abstract class PhysicalPlan
REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES,
ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX,
- CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG
- CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE,
++ CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG,
+ BATCH_INSERT_ONE_DEVICE
}
public long getIndex() {