You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/25 12:18:48 UTC

[iotdb] branch pipe-wal-resource-management created (now 384786b6a84)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 384786b6a84 Merge branch 'master' of github.com:apache/iotdb into pipe-wal-resource-management

This branch includes the following new commits:

     new 31fa754dcb6 reduce heartbeat frequency
     new 865f8914ce6 Merge branch 'master' of github.com:apache/iotdb into pipe-wal-resource-management
     new 8bedf08b00b wal resource manager
     new 77ce31eb61d bind WALPipeHandle
     new 384786b6a84 Merge branch 'master' of github.com:apache/iotdb into pipe-wal-resource-management

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 05/05: Merge branch 'master' of github.com:apache/iotdb into pipe-wal-resource-management

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 384786b6a84af3777c56f694036c6c5a50397307
Merge: 77ce31eb61d 05a54dc13d3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 25 19:57:19 2023 +0800

    Merge branch 'master' of github.com:apache/iotdb into pipe-wal-resource-management

 .github/workflows/client-cpp.yml                   |    8 +-
 .github/workflows/client-go.yml                    |    2 +-
 .github/workflows/client-python.yml                |    6 +-
 .github/workflows/grafana-plugin.yml               |    4 +-
 .github/workflows/influxdb-protocol.yml            |    2 +-
 .gitignore                                         |    9 +
 .gitmodules                                        |    6 +-
 client-go                                          |    1 -
 .../heartbeat/DataNodeHeartbeatHandler.java        |   10 +-
 .../consensus/request/ConfigPhysicalPlan.java      |    6 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |    1 +
 .../PipeHandleLeaderChangePlan.java                |    2 +-
 .../pipe/runtime/PipeHandleMetaChangePlan.java     |   67 +
 .../iotdb/confignode/manager/ConfigManager.java    |    1 +
 .../iotdb/confignode/manager/ProcedureManager.java |   22 +
 .../manager/load/service/HeartbeatService.java     |   16 +-
 .../iotdb/confignode/manager/node/NodeManager.java |    4 +-
 .../manager/partition/PartitionMetrics.java        |   35 +-
 .../pipe/runtime/PipeRuntimeCoordinator.java       |   21 +
 .../manager/schema/ClusterSchemaManager.java       |    3 +-
 .../persistence/executor/ConfigPlanExecutor.java   |    7 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   13 +-
 .../persistence/pipe/PipeTaskOperation.java        |    2 +-
 .../procedure/env/DataNodeRemoveHandler.java       |    2 +
 .../runtime/PipeHandleLeaderChangeProcedure.java   |    2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |  285 ++++
 .../impl/schema/DeleteDatabaseProcedure.java       |    4 +-
 .../procedure/store/ProcedureFactory.java          |    6 +
 .../confignode/procedure/store/ProcedureType.java  |    3 +-
 .../thrift/ConfigNodeRPCServiceHandlerMetrics.java |    8 +-
 .../thrift/ConfigNodeRPCServiceMetrics.java        |    8 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |   44 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |    1 -
 .../runtime/PipeHandleMetaChangeProcedureTest.java |   96 ++
 .../consensus/iot/IoTConsensusServerImpl.java      |   87 +-
 .../consensus/iot/IoTConsensusServerMetrics.java   |  327 ++--
 .../consensus/iot/client/DispatchLogHandler.java   |   25 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |   32 +-
 .../logdispatcher/LogDispatcherThreadMetrics.java  |  194 ++-
 .../ratis/ApplicationStateMachineProxy.java        |    2 +-
 .../iotdb/consensus/simple/SimpleConsensus.java    |    2 +-
 distribution/src/assembly/all.xml                  |    4 +-
 distribution/src/assembly/cli.xml                  |    4 +-
 distribution/src/assembly/client-cpp.xml           |    2 +-
 distribution/src/assembly/grafana-connector.xml    |    2 +-
 distribution/src/assembly/grafana-plugin.xml       |    2 +-
 distribution/src/assembly/spark-connector.xml      |    4 +-
 docker/src/main/DockerCompose/start-1c1d.sh        |    2 +-
 grafana-plugin/go.sum                              |  660 ---------
 influxdb-protocol/pom.xml                          |  140 --
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |  411 ------
 .../iotdb/influxdb/IoTDBInfluxDBFactory.java       |   70 -
 .../protocol/constant/InfluxDBConstant.java        |   25 -
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |   85 --
 .../protocol/impl/IoTDBInfluxDBService.java        |   99 --
 .../influxdb/protocol/util/ParameterUtils.java     |   36 -
 .../iotdb/influxdb/session/InfluxDBSession.java    |  279 ----
 .../influxdb/integration/IoTDBInfluxDBIT.java      |  207 ---
 integration-test/pom.xml                           |    6 +
 integration-test/src/assembly/mpp-test.xml         |    4 +-
 .../it/IoTDBConfigNodeConsensusEfficiencyIT.java   |    2 -
 integration/README.md                              |  120 --
 integration/checkstyle.xml                         |  217 ---
 integration/import-control.xml                     |   29 -
 integration/pom.xml                                |  295 ----
 integration/src/assembly/cluster.xml               |   47 -
 .../db/engine/trigger/example/Accumulator.java     |   96 --
 .../iotdb/db/engine/trigger/example/Counter.java   |   97 --
 .../iotdb/db/query/udf/example/Accumulator.java    |  102 --
 .../apache/iotdb/db/query/udf/example/Adder.java   |   92 --
 .../apache/iotdb/db/query/udf/example/Counter.java |   88 --
 .../db/query/udf/example/ExampleUDFConstant.java   |   31 -
 .../org/apache/iotdb/db/query/udf/example/Max.java |   74 -
 .../iotdb/db/query/udf/example/Multiplier.java     |   63 -
 .../SlidingSizeWindowConstructorTester0.java       |   59 -
 .../SlidingSizeWindowConstructorTester1.java       |   64 -
 .../SlidingTimeWindowConstructionTester.java       |   72 -
 .../db/query/udf/example/TerminateTester.java      |   67 -
 .../iotdb/db/query/udf/example/ValidateTester.java |   44 -
 .../iotdb/db/query/udf/example/WindowStartEnd.java |   66 -
 .../iotdb/integration/env/ClusterEnvBase.java      |  279 ----
 .../iotdb/integration/env/ClusterEnvConfig.java    |  154 --
 .../apache/iotdb/integration/env/ClusterNode.java  |  179 ---
 .../iotdb/integration/env/ConfigFactory.java       |   52 -
 .../apache/iotdb/integration/env/EnvFactory.java   |   57 -
 .../iotdb/integration/env/FiveNodeCluster1Env.java |   64 -
 .../iotdb/integration/env/RemoteEnvConfig.java     |   23 -
 .../iotdb/integration/env/RemoteServerEnv.java     |  107 --
 .../apache/iotdb/itbase/category/ClusterTest.java  |   21 -
 .../iotdb/itbase/category/LocalStandaloneTest.java |   21 -
 .../apache/iotdb/itbase/category/RemoteTest.java   |   21 -
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |  112 --
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   39 -
 .../apache/iotdb/db/integration/IOTDBInsertIT.java |  200 ---
 .../IoTDBAlignedTimeSeriesCompactionIT.java        |  564 -------
 .../iotdb/db/integration/IoTDBCheckConfigIT.java   |  142 --
 .../iotdb/db/integration/IoTDBClearCacheIT.java    |  171 ---
 .../apache/iotdb/db/integration/IoTDBCloseIT.java  |  191 ---
 .../iotdb/db/integration/IoTDBCompactionIT.java    |  338 -----
 .../integration/IoTDBCompactionWithIDTableIT.java  |  352 -----
 .../iotdb/db/integration/IoTDBCompressTypeIT.java  |  101 --
 .../db/integration/IoTDBContinuousQueryIT.java     |  544 -------
 .../apache/iotdb/db/integration/IoTDBDaemonIT.java |  456 ------
 .../iotdb/db/integration/IoTDBDisableAlignIT.java  |  392 -----
 .../apache/iotdb/db/integration/IoTDBFillIT.java   | 1227 ---------------
 .../db/integration/IoTDBFlushQueryMergeIT.java     |  197 ---
 .../iotdb/db/integration/IoTDBKillQueryIT.java     |   80 -
 .../iotdb/db/integration/IoTDBLargeDataIT.java     |  373 -----
 .../apache/iotdb/db/integration/IoTDBLastIT.java   |  629 --------
 .../iotdb/db/integration/IoTDBMaxTimeQueryIT.java  |  139 --
 .../IoTDBMultiOverlappedChunkInUnseqIT.java        |  110 --
 .../db/integration/IoTDBMultiOverlappedPageIT.java |  166 ---
 .../iotdb/db/integration/IoTDBMultiSeriesIT.java   |  471 ------
 .../db/integration/IoTDBMultiStatementsIT.java     |  187 ---
 .../db/integration/IoTDBNewTsFileCompactionIT.java | 1078 --------------
 .../db/integration/IoTDBOverlappedPageIT.java      |  194 ---
 .../iotdb/db/integration/IoTDBQueryTimeoutIT.java  |  156 --
 .../db/integration/IoTDBQueryWithIDTableIT.java    |  290 ----
 .../db/integration/IoTDBRemovePartitionIT.java     |  331 -----
 .../db/integration/IoTDBRepeatPatternNameIT.java   |   67 -
 .../db/integration/IoTDBRewriteTsFileToolIT.java   |  618 --------
 .../db/integration/IoTDBRpcCompressionIT.java      |  140 --
 .../iotdb/db/integration/IoTDBSensorUpdateIT.java  |   87 --
 .../db/integration/IoTDBSessionTimeoutIT.java      |   84 --
 .../IoTDBSetSystemReadOnlyWritableIT.java          |  258 ----
 .../integration/IoTDBSizeTieredCompactionIT.java   | 1370 -----------------
 .../integration/IoTDBSortedShowTimeseriesIT.java   |  339 -----
 .../iotdb/db/integration/IoTDBTimePartitionIT.java |   90 --
 .../iotdb/db/integration/IoTDBTracingIT.java       |   84 --
 .../apache/iotdb/db/integration/IoTDBTtlIT.java    |  354 -----
 .../db/integration/IoTDBUDFWindowQueryIT.java      |  808 ----------
 .../db/integration/IoTDBUDTFHybridQueryIT.java     |  160 --
 .../db/integration/IoTDBUDTFNonAlignQueryIT.java   |  341 -----
 .../db/integration/IoTDBUnseqOverlappedPageIT.java |  127 --
 .../iotdb/db/integration/IoTDBVersionIT.java       |   68 -
 .../iotdb/db/integration/env/StandaloneEnv.java    |   93 --
 .../db/integration/env/StandaloneEnvConfig.java    |  133 --
 .../versionadaption/IoTDBDDLVersionAdaptionIT.java |  362 -----
 .../IoTDBDeletionVersionAdaptionIT.java            |  303 ----
 .../IoTDBQueryVersionAdaptionIT.java               |  320 ----
 .../session/IoTDBSessionAlignedABDeviceIT.java     |  235 ---
 .../session/IoTDBSessionAlignedAggregationIT.java  |  263 ----
 .../IoTDBSessionAlignedAggregationWithUnSeqIT.java |  196 ---
 .../iotdb/session/IoTDBSessionIteratorIT.java      |  354 -----
 .../src/test/resources/iotdb-datanode.properties   |   24 -
 integration/src/test/resources/logback-test.xml    |   58 -
 {external-api => iotdb-api/external-api}/pom.xml   |    2 +-
 .../iotdb/external/api/IPropertiesLoader.java      |    0
 .../iotdb/external/api/ISeriesNumerMonitor.java    |    0
 .../external/api/thrift/JudgableServerContext.java |    0
 .../external/api/thrift/ServerContextFactory.java  |    0
 .../external-pipe-api}/pom.xml                     |    2 +-
 .../apache/iotdb/pipe/external/api/DataType.java   |    0
 .../external/api/ExternalPipeSinkWriterStatus.java |    0
 .../pipe/external/api/IExternalPipeSinkWriter.java |    0
 .../api/IExternalPipeSinkWriterFactory.java        |    0
 {pipe-api => iotdb-api/pipe-api}/pom.xml           |    2 +-
 .../org/apache/iotdb/pipe/api/PipeCollector.java   |    0
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |    0
 .../java/org/apache/iotdb/pipe/api/PipePlugin.java |    0
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   |    0
 .../java/org/apache/iotdb/pipe/api/access/Row.java |    0
 .../apache/iotdb/pipe/api/access/RowIterator.java  |    0
 .../iotdb/pipe/api/collector/EventCollector.java   |    0
 .../iotdb/pipe/api/collector/RowCollector.java     |    0
 .../api/customizer/PipeParameterValidator.java     |    0
 .../iotdb/pipe/api/customizer/PipeParameters.java  |    0
 .../api/customizer/PipeRuntimeConfiguration.java   |    0
 .../iotdb/pipe/api/customizer/PipeStrategy.java    |    0
 .../PipeCollectorRuntimeConfiguration.java         |    0
 .../PipeConnectorRuntimeConfiguration.java         |    0
 .../connector/parallel/ParallelStrategy.java       |    0
 .../retry/EqualRetryIntervalStrategy.java          |    0
 .../retry/ExponentialRetryIntervalStrategy.java    |    0
 .../customizer/connector/retry/RetryStrategy.java  |    0
 .../customizer/connector/reuse/ReuseStrategy.java  |    0
 .../PipeProcessorRuntimeConfiguration.java         |    0
 .../org/apache/iotdb/pipe/api/event/Event.java     |    0
 .../org/apache/iotdb/pipe/api/event/EventType.java |    0
 .../event/dml/insertion/TabletInsertionEvent.java  |    0
 .../event/dml/insertion/TsFileInsertionEvent.java  |    0
 .../PipeAttributeNotProvidedException.java         |    0
 .../api/exception/PipeConnectionException.java     |    0
 .../iotdb/pipe/api/exception/PipeException.java    |    0
 .../api/exception/PipeManagementException.java     |    0
 .../exception/PipeParameterNotValidException.java  |    0
 .../exception/PipeRuntimeCriticalException.java    |    0
 .../pipe/api/exception/PipeRuntimeException.java   |    0
 .../exception/PipeRuntimeNonCriticalException.java |    0
 .../exception/PipeStrategyNotValidException.java   |    0
 .../org/apache/iotdb/pipe/api/type/Binary.java     |    0
 .../java/org/apache/iotdb/pipe/api/type/Type.java  |    0
 .../subscription-api}/pom.xml                      |    2 +-
 .../api/SubscriptionConfiguration.java             |    0
 .../subscription/api/SubscriptionFactory.java      |    0
 .../iotdb/subscription/api/consumer/Consumer.java  |    0
 .../api/consumer/pull/PullConsumer.java            |    0
 .../api/consumer/push/DataArrivalListener.java     |    0
 .../api/consumer/push/ExceptionListener.java       |    0
 .../api/consumer/push/PushConsumer.java            |    0
 .../api/dataset/SubscriptionDataSet.java           |    0
 .../api/exception/SubscriptionException.java       |    0
 .../SubscriptionStrategyNotValidException.java     |    0
 .../api/strategy/SubscriptionStrategy.java         |    0
 .../disorder/DisorderHandlingStrategy.java         |    0
 .../api/strategy/disorder/IntolerableStrategy.java |    0
 .../api/strategy/disorder/WatermarkStrategy.java   |    0
 .../strategy/topic/MultipleConnectionStrategy.java |    0
 .../api/strategy/topic/SingleTopicStrategy.java    |    0
 .../api/strategy/topic/TopicsStrategy.java         |    0
 {trigger-api => iotdb-api/trigger-api}/pom.xml     |    2 +-
 .../java/org/apache/iotdb/trigger/api/Trigger.java |    0
 .../iotdb/trigger/api/TriggerAttributes.java       |    0
 .../iotdb/trigger/api/enums/FailureStrategy.java   |    0
 .../iotdb/trigger/api/enums/TriggerEvent.java      |    0
 .../iotdb/trigger/api/enums/TriggerType.java       |    0
 {udf-api => iotdb-api/udf-api}/pom.xml             |    2 +-
 .../main/java/org/apache/iotdb/udf/api/UDF.java    |    0
 .../main/java/org/apache/iotdb/udf/api/UDTF.java   |    0
 .../java/org/apache/iotdb/udf/api/access/Row.java  |    0
 .../apache/iotdb/udf/api/access/RowIterator.java   |    0
 .../org/apache/iotdb/udf/api/access/RowWindow.java |    0
 .../iotdb/udf/api/collector/PointCollector.java    |    0
 .../api/customizer/config/UDFConfigurations.java   |    0
 .../api/customizer/config/UDTFConfigurations.java  |    0
 .../parameter/UDFParameterValidator.java           |    0
 .../api/customizer/parameter/UDFParameters.java    |    0
 .../api/customizer/strategy/AccessStrategy.java    |    0
 .../strategy/MappableRowByRowAccessStrategy.java   |    0
 .../strategy/RowByRowAccessStrategy.java           |    0
 .../strategy/SessionTimeWindowAccessStrategy.java  |    0
 .../strategy/SlidingSizeWindowAccessStrategy.java  |    0
 .../strategy/SlidingTimeWindowAccessStrategy.java  |    0
 .../strategy/StateWindowAccessStrategy.java        |    0
 .../UDFAttributeNotProvidedException.java          |    0
 .../iotdb/udf/api/exception/UDFException.java      |    0
 .../UDFInputSeriesDataTypeNotValidException.java   |    0
 .../UDFInputSeriesIndexNotValidException.java      |    0
 .../UDFInputSeriesNumberNotValidException.java     |    0
 .../udf/api/exception/UDFManagementException.java  |    0
 .../UDFOutputSeriesDataTypeNotValidException.java  |    0
 .../exception/UDFParameterNotValidException.java   |    0
 .../java/org/apache/iotdb/udf/api/type/Binary.java |    0
 .../java/org/apache/iotdb/udf/api/type/Type.java   |    0
 {cli => iotdb-client/cli}/pom.xml                  |    2 +-
 {cli => iotdb-client/cli}/src/assembly/cli.xml     |    0
 .../src/assembly/resources/conf/logback-cli.xml    |    0
 .../cli}/src/assembly/resources/sbin/start-cli.bat |    0
 .../cli}/src/assembly/resources/sbin/start-cli.sh  |    0
 .../src/assembly/resources/tools/export-csv.bat    |    0
 .../src/assembly/resources/tools/export-csv.sh     |    0
 .../src/assembly/resources/tools/export-tsfile.bat |    0
 .../src/assembly/resources/tools/export-tsfile.sh  |    0
 .../src/assembly/resources/tools/import-csv.bat    |    0
 .../src/assembly/resources/tools/import-csv.sh     |    0
 .../src/assembly/resources/tools/load-tsfile.bat   |    0
 .../src/assembly/resources/tools/load-tsfile.sh    |    0
 .../java/org/apache/iotdb/cli/AbstractCli.java     |    0
 .../src/main/java/org/apache/iotdb/cli/Cli.java    |    0
 .../apache/iotdb/cli/IoTDBSyntaxHighlighter.java   |    0
 .../org/apache/iotdb/cli/utils/IoTPrinter.java     |    0
 .../org/apache/iotdb/cli/utils/JlineUtils.java     |    0
 .../apache/iotdb/exception/ArgsErrorException.java |    0
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |    0
 .../org/apache/iotdb/tool/AbstractTsFileTool.java  |    0
 .../main/java/org/apache/iotdb/tool/ExportCsv.java |    0
 .../java/org/apache/iotdb/tool/ExportTsFile.java   |    0
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |    0
 .../java/org/apache/iotdb/cli/AbstractCliIT.java   |    0
 .../java/org/apache/iotdb/cli/AbstractScript.java  |    0
 .../iotdb/tool/integration/ExportCsvTestIT.java    |    0
 .../iotdb/tool/integration/ImportCsvTestIT.java    |    0
 .../apache/iotdb/tool/unit/WriteCsvFileTestUT.java |    0
 .../cli}/src/test/resources/logback.xml            |    0
 {client-cpp => iotdb-client/client-cpp}/README.md  |    0
 {client-cpp => iotdb-client/client-cpp}/pom.xml    |   24 +-
 .../client-cpp}/src/assembly/client-cpp.xml        |    0
 .../client-cpp}/src/main/CMakeLists.txt            |    0
 .../client-cpp}/src/main/Session.cpp               |    0
 .../client-cpp}/src/main/Session.h                 |    0
 .../client-cpp}/src/test/CMakeLists.txt            |    0
 .../client-cpp}/src/test/cpp/sessionIT.cpp         |    0
 .../client-cpp}/src/test/main.cpp                  |    0
 iotdb-client/client-go                             |    1 +
 {client-py => iotdb-client/client-py}/.flake8      |    0
 {client-py => iotdb-client/client-py}/.gitignore   |    0
 {client-py => iotdb-client/client-py}/README.md    |    0
 .../client-py}/SessionAlignedTimeseriesExample.py  |    0
 .../client-py}/SessionExample.py                   |    0
 .../client-py}/iotdb/IoTDBContainer.py             |    0
 .../client-py}/iotdb/Session.py                    |    0
 .../client-py}/iotdb/__init__.py                   |    0
 .../client-py}/iotdb/dbapi/Connection.py           |    0
 .../client-py}/iotdb/dbapi/Cursor.py               |    0
 .../client-py}/iotdb/dbapi/Exceptions.py           |    0
 .../client-py}/iotdb/dbapi/__init__.py             |    0
 .../client-py}/iotdb/dbapi/tests/__init__.py       |    0
 .../iotdb/dbapi/tests/test_connection.py           |    0
 .../client-py}/iotdb/dbapi/tests/test_cursor.py    |    0
 .../client-py}/iotdb/sqlalchemy/IoTDBDialect.py    |    0
 .../iotdb/sqlalchemy/IoTDBIdentifierPreparer.py    |    0
 .../iotdb/sqlalchemy/IoTDBSQLCompiler.py           |    0
 .../iotdb/sqlalchemy/IoTDBTypeCompiler.py          |    0
 .../client-py}/iotdb/sqlalchemy/__init__.py        |    0
 .../client-py}/iotdb/sqlalchemy/tests/__init__.py  |    0
 .../iotdb/sqlalchemy/tests/test_dialect.py         |    0
 .../client-py}/iotdb/template/InternalNode.py      |    0
 .../client-py}/iotdb/template/MeasurementNode.py   |    0
 .../client-py}/iotdb/template/Template.py          |    0
 .../client-py}/iotdb/template/TemplateNode.py      |    0
 .../client-py}/iotdb/template/TemplateQueryType.py |    0
 .../client-py}/iotdb/template/__init__.py          |    0
 .../client-py}/iotdb/tsfile/__init__.py            |    0
 .../client-py}/iotdb/tsfile/common/__init__.py     |    0
 .../iotdb/tsfile/common/constant/TsFileConstant.py |    0
 .../iotdb/tsfile/common/constant/__init__.py       |    0
 .../client-py}/iotdb/tsfile/utils/Pair.py          |    0
 .../iotdb/tsfile/utils/ReadWriteIOUtils.py         |    0
 .../client-py}/iotdb/tsfile/utils/__init__.py      |    0
 .../client-py}/iotdb/utils/BitMap.py               |    0
 .../client-py}/iotdb/utils/Field.py                |    0
 .../iotdb/utils/IoTDBConnectionException.py        |    0
 .../client-py}/iotdb/utils/IoTDBConstants.py       |    0
 .../client-py}/iotdb/utils/IoTDBRpcDataSet.py      |    0
 .../client-py}/iotdb/utils/NumpyTablet.py          |    0
 .../client-py}/iotdb/utils/RowRecord.py            |    0
 .../client-py}/iotdb/utils/SessionDataSet.py       |    0
 .../client-py}/iotdb/utils/Tablet.py               |    0
 .../client-py}/iotdb/utils/__init__.py             |    0
 {client-py => iotdb-client/client-py}/pom.xml      |    6 +-
 .../client-py}/pyproject.toml                      |    0
 {client-py => iotdb-client/client-py}/release.sh   |    0
 .../client-py}/requirements.txt                    |    0
 .../client-py}/requirements_dev.txt                |    0
 {client-py => iotdb-client/client-py}/setup.py     |    0
 .../client-py}/tests/__init__.py                   |    0
 .../tests/tablet_performance_comparison.py         |    0
 .../client-py}/tests/test_aligned_timeseries.py    |    0
 .../client-py}/tests/test_dataframe.py             |    0
 .../client-py}/tests/test_delete_data.py           |    0
 .../client-py}/tests/test_numpy_tablet.py          |    0
 .../client-py}/tests/test_one_device.py            |    0
 .../client-py}/tests/test_session.py               |    0
 .../client-py}/tests/test_tablet.py                |    0
 .../client-py}/tests/test_template.py              |    0
 .../client-py}/tests/test_todf.py                  |    0
 .../compile-tools}/README.md                       |    0
 .../compile-tools}/pom.xml                         |    2 +-
 .../compile-tools}/thrift/pom.xml                  |    0
 {isession => iotdb-client/isession}/pom.xml        |    1 +
 .../java/org/apache/iotdb/isession/ISession.java   |    0
 .../org/apache/iotdb/isession/SessionConfig.java   |    0
 .../org/apache/iotdb/isession/SessionDataSet.java  |    0
 .../apache/iotdb/isession/pool/ISessionPool.java   |    0
 .../iotdb/isession/pool/SessionDataSetWrapper.java |    0
 .../apache/iotdb/isession/template/Template.java   |    0
 .../iotdb/isession/template/TemplateNode.java      |    0
 .../apache/iotdb/isession/util/SystemStatus.java   |    0
 .../org/apache/iotdb/isession/util/Version.java    |    0
 {jdbc => iotdb-client/jdbc}/README.md              |    0
 {jdbc => iotdb-client/jdbc}/osgi.bnd               |    0
 {jdbc => iotdb-client/jdbc}/pom.xml                |    2 +-
 .../jdbc}/src/main/feature/feature.xml             |    0
 .../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java     |    0
 .../main/java/org/apache/iotdb/jdbc/Activator.java |    0
 .../main/java/org/apache/iotdb/jdbc/Config.java    |    0
 .../main/java/org/apache/iotdb/jdbc/Constant.java  |    0
 .../src/main/java/org/apache/iotdb/jdbc/Field.java |    0
 .../iotdb/jdbc/GroupedLSBWatermarkEncoder.java     |    0
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |    0
 .../apache/iotdb/jdbc/IoTDBConnectionParams.java   |    0
 .../org/apache/iotdb/jdbc/IoTDBDataSource.java     |    0
 .../apache/iotdb/jdbc/IoTDBDataSourceFactory.java  |    0
 .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java   |    0
 .../java/org/apache/iotdb/jdbc/IoTDBDriver.java    |    0
 .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java  |    0
 .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java     |    0
 .../apache/iotdb/jdbc/IoTDBPreparedStatement.java  |    0
 .../org/apache/iotdb/jdbc/IoTDBResultMetadata.java |    0
 .../org/apache/iotdb/jdbc/IoTDBSQLException.java   |    0
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |    0
 .../org/apache/iotdb/jdbc/IoTDBTracingInfo.java    |    0
 .../org/apache/iotdb/jdbc/IoTDBURLException.java   |    0
 .../java/org/apache/iotdb/jdbc/StringUtils.java    |    0
 .../src/main/java/org/apache/iotdb/jdbc/Utils.java |    0
 .../org/apache/iotdb/jdbc/WatermarkEncoder.java    |    0
 .../resources/services/META-INF/java.sql.Driver    |    0
 .../test/java/org/apache/iotdb/jdbc/BatchTest.java |    0
 .../org/apache/iotdb/jdbc/IoTDBConnectionTest.java |    0
 .../iotdb/jdbc/IoTDBDatabaseMetadataTest.java      |    0
 .../apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java  |    0
 .../iotdb/jdbc/IoTDBPreparedStatementTest.java     |    0
 .../apache/iotdb/jdbc/IoTDBResultMetadataTest.java |    0
 .../org/apache/iotdb/jdbc/IoTDBStatementTest.java  |    0
 .../test/java/org/apache/iotdb/jdbc/UtilsTest.java |    0
 {service-rpc => iotdb-client/service-rpc}/pom.xml  |    2 +-
 .../org/apache/iotdb/rpc/AutoResizingBuffer.java   |    3 -
 .../iotdb/rpc/AutoScalingBufferReadTransport.java  |    0
 .../iotdb/rpc/AutoScalingBufferWriteTransport.java |    0
 .../apache/iotdb/rpc/BatchExecutionException.java  |    0
 .../iotdb/rpc/ConfigNodeConnectionException.java   |    0
 .../apache/iotdb/rpc/ConfigurableTByteBuffer.java  |    0
 .../iotdb/rpc/InfluxDBSynchronizedHandler.java     |    0
 .../apache/iotdb/rpc/IoTDBConnectionException.java |    0
 .../org/apache/iotdb/rpc/IoTDBJDBCDataSet.java     |    0
 .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java |    0
 .../apache/iotdb/rpc/NoValidValueException.java    |    0
 .../org/apache/iotdb/rpc/NonOpenTransport.java     |    0
 .../org/apache/iotdb/rpc/RedirectException.java    |    0
 .../main/java/org/apache/iotdb/rpc/RpcStat.java    |    0
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  |    0
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |    0
 .../iotdb/rpc/StatementExecutionException.java     |    0
 .../org/apache/iotdb/rpc/SynchronizedHandler.java  |    0
 .../rpc/TCompressedElasticFramedTransport.java     |    0
 .../org/apache/iotdb/rpc/TConfigurationConst.java  |    0
 .../apache/iotdb/rpc/TElasticFramedTransport.java  |    0
 .../iotdb/rpc/TNonblockingSocketWrapper.java       |    0
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |    0
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   |    0
 .../java/org/apache/iotdb/rpc/TSocketWrapper.java  |    0
 .../rpc/TimeoutChangeableTFastFramedTransport.java |    0
 .../TimeoutChangeableTSnappyFramedTransport.java   |    0
 .../iotdb/rpc/TimeoutChangeableTransport.java      |    0
 .../org/apache/iotdb/rpc/TSStatusCodeTest.java     |    0
 {session => iotdb-client/session}/pom.xml          |    2 +-
 .../org/apache/iotdb/session/InsertConsumer.java   |    0
 .../java/org/apache/iotdb/session/Session.java     |    7 -
 .../apache/iotdb/session/SessionConnection.java    |    0
 .../org/apache/iotdb/session/pool/SessionPool.java |   16 -
 .../iotdb/session/template/InternalNode.java       |    0
 .../iotdb/session/template/MeasurementNode.java    |    0
 .../iotdb/session/template/TemplateQueryType.java  |    0
 .../apache/iotdb/session/util/SessionUtils.java    |    0
 .../org/apache/iotdb/session/util/ThreadUtils.java |    0
 .../apache/iotdb/session/SessionCacheLeaderUT.java |    0
 .../java/org/apache/iotdb/session/TabletTest.java  |    0
 .../apache/iotdb/session/pool/SessionPoolTest.java |    0
 .../apache/iotdb/session/util/ThreadUtilsTest.java |    0
 .../src/test/resources/iotdb-common.properties     |    0
 .../src/test/resources/iotdb-datanode.properties   |    0
 .../session}/src/test/resources/logback.xml        |    0
 .../flink-iotdb-connector}/README.md               |    0
 .../flink-iotdb-connector}/pom.xml                 |    2 +-
 .../iotdb/flink/DefaultIoTSerializationSchema.java |    0
 .../main/java/org/apache/iotdb/flink/Event.java    |    0
 .../java/org/apache/iotdb/flink/IoTDBSink.java     |    0
 .../java/org/apache/iotdb/flink/IoTDBSource.java   |    0
 .../apache/iotdb/flink/IoTSerializationSchema.java |    0
 .../apache/iotdb/flink/options/IoTDBOptions.java   |    0
 .../iotdb/flink/options/IoTDBSinkOptions.java      |    0
 .../iotdb/flink/options/IoTDBSourceOptions.java    |    0
 .../flink/DefaultIoTSerializationSchemaTest.java   |    0
 .../iotdb/flink/IoTDBSinkBatchInsertTest.java      |    0
 .../iotdb/flink/IoTDBSinkBatchTimerTest.java       |    0
 .../apache/iotdb/flink/IoTDBSinkInsertTest.java    |    0
 .../flink-tsfile-connector}/README.md              |    0
 .../flink-tsfile-connector}/pom.xml                |    2 +-
 .../apache/iotdb/flink/tsfile/RowRecordParser.java |    0
 .../iotdb/flink/tsfile/RowRowRecordParser.java     |    0
 .../iotdb/flink/tsfile/RowTSRecordConverter.java   |    0
 .../iotdb/flink/tsfile/TSRecordConverter.java      |    0
 .../iotdb/flink/tsfile/TSRecordOutputFormat.java   |    0
 .../iotdb/flink/tsfile/TsFileInputFormat.java      |    0
 .../iotdb/flink/tsfile/TsFileOutputFormat.java     |    0
 .../iotdb/flink/tsfile/util/TSFileConfigUtil.java  |    0
 .../RowTSRecordOutputFormatIntegrationTest.java    |    0
 .../flink/tsfile/RowTSRecordOutputFormatTest.java  |    0
 .../flink/tsfile/RowTsFileConnectorTestBase.java   |    0
 .../RowTsFileInputFormatIntegrationTest.java       |    0
 .../flink/tsfile/RowTsFileInputFormatTest.java     |    0
 .../flink/tsfile/RowTsFileInputFormatTestBase.java |    0
 .../tsfile/RowTsFileOutputFormatTestBase.java      |    0
 .../util/TSFileConfigUtilCompletenessTest.java     |    0
 .../apache/iotdb/flink/util/TsFileWriteUtil.java   |    0
 .../grafana-connector}/img/add_data_source.png     |  Bin
 .../grafana-connector}/img/add_graph.png           |  Bin
 .../grafana-connector}/img/edit_data_source.png    |  Bin
 .../grafana-connector}/pom.xml                     |    2 +-
 .../grafana-connector}/readme.md                   |    0
 .../grafana-connector}/readme_zh.md                |    0
 .../web/grafana/TsfileWebDemoApplication.java      |    0
 .../apache/iotdb/web/grafana/bean/TimeValues.java  |    0
 .../iotdb/web/grafana/conf/MyConfiguration.java    |    0
 .../controller/DatabaseConnectController.java      |    0
 .../org/apache/iotdb/web/grafana/dao/BasicDao.java |    0
 .../iotdb/web/grafana/dao/impl/BasicDaoImpl.java   |    0
 .../web/grafana/interceptor/LoginInterceptor.java  |    0
 .../grafana/service/DatabaseConnectService.java    |    0
 .../service/impl/DatabaseConnectServiceImpl.java   |    0
 .../src/main/resources/application.properties      |    0
 .../web/grafana/dao/impl/BasicDaoImplTest.java     |    0
 .../grafana-plugin}/.gitignore                     |    0
 .../grafana-plugin}/.prettierrc.js                 |    0
 .../grafana-plugin}/CHANGELOG.md                   |    0
 .../grafana-plugin}/LICENSE                        |    0
 .../grafana-plugin}/Magefile.go                    |    0
 .../grafana-plugin}/README.md                      |    0
 .../grafana-plugin}/backend-compile.bat            |    0
 .../grafana-plugin}/backend-compile.sh             |    0
 .../grafana-plugin}/go.mod                         |    0
 .../grafana-plugin}/jest.config.js                 |    0
 .../grafana-plugin}/package.json                   |    0
 .../grafana-plugin}/pkg/main.go                    |    0
 .../pkg/plugin/iotdb_resource_handler.go           |    0
 .../grafana-plugin}/pkg/plugin/plugin.go           |    0
 .../grafana-plugin}/pom.xml                        |    2 +-
 .../grafana-plugin}/src/ConfigEditor.tsx           |    0
 .../grafana-plugin}/src/QueryEditor.tsx            |    0
 .../src/componments/AggregateFun.tsx               |    0
 .../src/componments/ControlValue.tsx               |    0
 .../grafana-plugin}/src/componments/FillValue.tsx  |    0
 .../grafana-plugin}/src/componments/Form.tsx       |    0
 .../grafana-plugin}/src/componments/FromValue.tsx  |    0
 .../grafana-plugin}/src/componments/GroupBy.tsx    |    0
 .../src/componments/SelectValue.tsx                |    0
 .../grafana-plugin}/src/componments/TimeSeries.tsx |    0
 .../grafana-plugin}/src/componments/WhereValue.tsx |    0
 .../grafana-plugin}/src/datasource.ts              |    0
 .../grafana-plugin}/src/functions.ts               |    0
 .../grafana-plugin}/src/img/addIoTDBDataSource.png |  Bin
 .../grafana-plugin}/src/img/logo.svg               |    0
 .../grafana-plugin}/src/img/showData.png           |  Bin
 .../grafana-plugin}/src/module.ts                  |    0
 .../grafana-plugin}/src/plugin.json                |    0
 .../grafana-plugin}/src/types.ts                   |    0
 .../grafana-plugin}/tsconfig.json                  |    0
 .../grafana-plugin}/yarn.lock                      |    0
 {hadoop => iotdb-connector/hadoop}/README.md       |    0
 {hadoop => iotdb-connector/hadoop}/pom.xml         |    2 +-
 .../iotdb/hadoop/fileSystem/HDFSConfUtil.java      |    0
 .../apache/iotdb/hadoop/fileSystem/HDFSFile.java   |    0
 .../apache/iotdb/hadoop/fileSystem/HDFSInput.java  |    0
 .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java |    0
 .../org/apache/iotdb/hadoop/tsfile/IReaderSet.java |    0
 .../iotdb/hadoop/tsfile/TSFHadoopException.java    |    0
 .../apache/iotdb/hadoop/tsfile/TSFInputFormat.java |    0
 .../apache/iotdb/hadoop/tsfile/TSFInputSplit.java  |    0
 .../iotdb/hadoop/tsfile/TSFOutputFormat.java       |    0
 .../iotdb/hadoop/tsfile/TSFRecordReader.java       |    0
 .../iotdb/hadoop/tsfile/TSFRecordWriter.java       |    0
 .../iotdb/hadoop/tsfile/record/HDFSTSRecord.java   |    0
 .../apache/iotdb/hadoop/tsfile/TSFHadoopTest.java  |    0
 .../iotdb/hadoop/tsfile/TSFInputSplitTest.java     |    0
 .../iotdb/hadoop/tsfile/TsFileTestHelper.java      |    0
 .../iotdb/hadoop/tsfile/constant/TestConstant.java |    0
 .../hadoop}/src/test/resources/logback.xml         |    0
 .../hive-connector}/pom.xml                        |    2 +-
 .../org/apache/iotdb/hive/TSFHiveInputFormat.java  |    0
 .../org/apache/iotdb/hive/TSFHiveOutputFormat.java |    0
 .../org/apache/iotdb/hive/TSFHiveRecordReader.java |    0
 .../org/apache/iotdb/hive/TSFHiveRecordWriter.java |    0
 .../org/apache/iotdb/hive/TsFileDeserializer.java  |    0
 .../java/org/apache/iotdb/hive/TsFileSerDe.java    |    0
 .../apache/iotdb/hive/TsFileSerDeException.java    |    0
 .../apache/iotdb/hive/TSFHiveInputFormatTest.java  |    0
 .../apache/iotdb/hive/TSFHiveRecordReaderTest.java |    0
 .../apache/iotdb/hive/TsFileDeserializerTest.java  |    0
 .../org/apache/iotdb/hive/TsFileSerDeTest.java     |    0
 .../org/apache/iotdb/hive/TsFileTestHelper.java    |    0
 .../apache/iotdb/hive/constant/TestConstant.java   |    0
 .../hive-connector}/src/test/resources/logback.xml |    0
 .../spark-iotdb-connector}/pom.xml                 |    2 +-
 .../spark-iotdb-connector}/scala_2.11/pom.xml      |    2 +-
 .../spark-iotdb-connector}/scala_2.12/pom.xml      |    2 +-
 .../org/apache/iotdb/spark/db/Converter.scala      |    0
 .../org/apache/iotdb/spark/db/DataFrameTools.scala |    0
 .../org/apache/iotdb/spark/db/DefaultSource.scala  |    0
 .../org/apache/iotdb/spark/db/IoTDBOptions.scala   |    0
 .../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala |    0
 .../org/apache/iotdb/spark/db/IoTDBRelation.scala  |    0
 .../org/apache/iotdb/spark/db/SQLConstant.scala    |    0
 .../org/apache/iotdb/spark/db/Transformer.scala    |    0
 .../scala/org/apache/iotdb/spark/db/package.scala  |    0
 .../spark-tsfile}/pom.xml                          |    2 +-
 .../iotdb/spark/tsfile/io/TsFileOutputFormat.java  |    0
 .../iotdb/spark/tsfile/io/TsFileRecordWriter.java  |    0
 .../org/apache/iotdb/spark/tsfile/qp/Executor.java |    0
 .../iotdb/spark/tsfile/qp/QueryProcessor.java      |    0
 .../spark/tsfile/qp/common/BasicOperator.java      |    0
 .../spark/tsfile/qp/common/FilterOperator.java     |    0
 .../iotdb/spark/tsfile/qp/common/Operator.java     |    0
 .../iotdb/spark/tsfile/qp/common/SQLConstant.java  |    0
 .../iotdb/spark/tsfile/qp/common/SingleQuery.java  |    0
 .../iotdb/spark/tsfile/qp/common/TSQueryPlan.java  |    0
 .../qp/exception/BasicOperatorException.java       |    0
 .../tsfile/qp/exception/DNFOptimizeException.java  |    0
 .../qp/exception/LogicalOptimizeException.java     |    0
 .../tsfile/qp/exception/MergeFilterException.java  |    0
 .../qp/exception/QueryOperatorException.java       |    0
 .../qp/exception/QueryProcessorException.java      |    0
 .../tsfile/qp/exception/RemoveNotException.java    |    0
 .../tsfile/qp/optimizer/DNFFilterOptimizer.java    |    0
 .../tsfile/qp/optimizer/IFilterOptimizer.java      |    0
 .../qp/optimizer/MergeSingleFilterOptimizer.java   |    0
 .../tsfile/qp/optimizer/PhysicalOptimizer.java     |    0
 .../tsfile/qp/optimizer/RemoveNotOptimizer.java    |    0
 .../org/apache/iotdb/spark/tsfile/Converter.scala  |    0
 .../apache/iotdb/spark/tsfile/DefaultSource.scala  |    0
 .../iotdb/spark/tsfile/NarrowConverter.scala       |    0
 .../spark/tsfile/NarrowTsFileOutputWriter.scala    |    0
 .../apache/iotdb/spark/tsfile/Transformer.scala    |    0
 .../iotdb/spark/tsfile/TsFileWriterFactory.scala   |    0
 .../apache/iotdb/spark/tsfile/WideConverter.scala  |    0
 .../spark/tsfile/WideTsFileOutputWriter.scala      |    0
 .../org/apache/iotdb/spark/tsfile/package.scala    |    0
 .../apache/iotdb/spark/constant/TestConstant.java  |    0
 .../org/apache/iotdb/spark/tool/TsFileExample.java |    0
 .../apache/iotdb/spark/tool/TsFileWriteTool.java   |    0
 .../apache/iotdb/spark/tsfile/ConverterTest.scala  |    0
 .../apache/iotdb/spark/tsfile/HDFSInputTest.java   |    0
 .../org/apache/iotdb/spark/tsfile/TSFileSuit.scala |    0
 .../zeppelin-interpreter}/IoTDB-Zeppelin-Demo.zpln |    0
 .../zeppelin-interpreter}/pom.xml                  |    2 +-
 .../apache/zeppelin/iotdb/IoTDBInterpreter.java    |    0
 .../src/main/resources/interpreter-setting.json    |    0
 {openapi => iotdb-protocol/openapi}/pom.xml        |    2 +-
 .../src/main/openapi3/iotdb_rest_common.yaml       |    0
 .../openapi}/src/main/openapi3/iotdb_rest_v1.yaml  |    0
 .../openapi}/src/main/openapi3/iotdb_rest_v2.yaml  |    0
 .../thrift-commons}/pom.xml                        |    2 +-
 .../thrift-commons}/src/main/thrift/common.thrift  |    0
 .../thrift-confignode}/pom.xml                     |    2 +-
 .../src/main/thrift/confignode.thrift              |    1 +
 .../thrift-influxdb}/README.md                     |    0
 .../thrift-influxdb}/pom.xml                       |    2 +-
 .../src/main/thrift/influxdb.thrift                |    0
 .../thrift-iot-consensus}/pom.xml                  |    2 +-
 .../src/main/thrift/iotconsensus.thrift            |    0
 .../thrift-mlnode}/pom.xml                         |    2 +-
 .../thrift-mlnode}/src/main/thrift/mlnode.thrift   |    0
 {thrift => iotdb-protocol/thrift}/README.md        |    0
 {thrift => iotdb-protocol/thrift}/pom.xml          |    2 +-
 .../thrift}/rpc-changelist.md                      |    0
 .../thrift}/src/main/thrift/client.thrift          |    0
 .../thrift}/src/main/thrift/datanode.thrift        |    2 +
 .../java/org/apache/iotdb/metrics/type/Gauge.java  |    1 -
 .../apache/iotdb/commons/conf/CommonConfig.java    |   10 +
 .../iotdb/commons/conf/CommonDescriptor.java       |    3 +
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |   13 +-
 .../schema/view/viewExpression/ViewExpression.java |    2 +
 .../{enums => }/PerformanceOverviewMetrics.java    |    4 +-
 .../iotdb/commons/service/metric/enums/Metric.java |  192 +--
 .../iotdb/commons/service/metric/enums/Tag.java    |   26 +-
 pom.xml                                            |   68 +-
 rewrite-tsfile-tool/pom.xml                        |   83 --
 .../src/assembly/resources/sbin/rewrite-tsfile.bat |   60 -
 .../src/assembly/resources/sbin/rewrite-tsfile.sh  |   48 -
 rewrite-tsfile-tool/src/assembly/rewriteTsFile.xml |   40 -
 .../java/org/apache/iotdb/RewriteTsFileTool.java   |  914 ------------
 schema-engine-rocksdb/README.md                    |   38 -
 schema-engine-rocksdb/pom.xml                      |   79 -
 .../resources/conf/schema-rocksdb.properties       |   94 --
 .../src/assembly/schema-engine-rocksdb.xml         |   44 -
 .../schemaregion/rocksdb/CheckKeyResult.java       |   50 -
 .../schemaregion/rocksdb/RSchemaConfLoader.java    |  185 ---
 .../schemaregion/rocksdb/RSchemaConstants.java     |   76 -
 .../schemaregion/rocksdb/RSchemaLogger.java        |   61 -
 .../rocksdb/RSchemaReadWriteHandler.java           |  515 -------
 .../schemaregion/rocksdb/RSchemaRegion.java        | 1558 --------------------
 .../schemaregion/rocksdb/RSchemaUtils.java         |  592 --------
 .../schemaregion/rocksdb/mnode/RDatabaseMNode.java |  117 --
 .../schemaregion/rocksdb/mnode/RDeviceMNode.java   |  171 ---
 .../schemaregion/rocksdb/mnode/RInternalMNode.java |  171 ---
 .../schemaregion/rocksdb/mnode/RMNode.java         |  230 ---
 .../schemaregion/rocksdb/mnode/RMNodeType.java     |   49 -
 .../rocksdb/mnode/RMNodeValueType.java             |   47 -
 .../rocksdb/mnode/RMeasurementMNode.java           |  249 ----
 .../schemaregion/rocksdb/MRocksDBBenchmark.java    |   98 --
 .../schemaregion/rocksdb/MRocksDBUnitTest.java     |  265 ----
 .../rocksdb/RSchemaReadWriteHandlerTest.java       |   77 -
 .../rocksdb/RocksDBBenchmarkEngine.java            |  144 --
 .../schemaregion/rocksdb/RocksDBBenchmarkTask.java |  109 --
 .../schemaregion/rocksdb/RocksDBTestUtils.java     |   72 -
 schema-engine-tag/README.md                        |  190 ---
 schema-engine-tag/pom.xml                          |   81 -
 .../assembly/resources/conf/schema-tag.properties  |   33 -
 .../src/assembly/schema-engine-tag.xml             |   44 -
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |  611 --------
 .../tagSchemaRegion/config/TagSchemaConfig.java    |   55 -
 .../config/TagSchemaDescriptor.java                |   93 --
 .../idtable/IDTableWithDeviceIDListImpl.java       |  138 --
 .../tagIndex/ITagInvertedIndex.java                |   61 -
 .../tagIndex/Request/DeletionRequest.java          |   55 -
 .../tagIndex/Request/InsertionRequest.java         |   59 -
 .../tagIndex/Request/QueryRequest.java             |   46 -
 .../tagSchemaRegion/tagIndex/TagInvertedIndex.java |  183 ---
 .../tagIndex/deletion/MemChunkDeletion.java        |   57 -
 .../tagIndex/deletion/MemChunkGroupDeletion.java   |   68 -
 .../tagIndex/deletion/MemTableDeletion.java        |   77 -
 .../tagIndex/deletion/MemTableGroupDeletion.java   |   66 -
 .../tagIndex/insertion/MemChunkGroupInsertion.java |   65 -
 .../tagIndex/insertion/MemChunkInsertion.java      |   58 -
 .../tagIndex/insertion/MemTableGroupInsertion.java |   74 -
 .../tagIndex/insertion/MemTableInsertion.java      |   67 -
 .../tagIndex/memtable/MemChunk.java                |   54 -
 .../tagIndex/memtable/MemChunkGroup.java           |   56 -
 .../tagIndex/memtable/MemTable.java                |   87 --
 .../tagIndex/memtable/MemTableGroup.java           |  102 --
 .../tagIndex/query/MemChunkGroupQuery.java         |   61 -
 .../tagIndex/query/MemChunkQuery.java              |   67 -
 .../tagIndex/query/MemTableGroupQuery.java         |   59 -
 .../tagIndex/query/MemTableQuery.java              |   72 -
 .../tagIndex/response/QueryResponse.java           |   71 -
 .../tagSchemaRegion/tagIndex/wal/WALEntry.java     |  134 --
 .../tagSchemaRegion/tagIndex/wal/WALManager.java   |  141 --
 .../utils/MeasurementPathUtils.java                |   80 -
 .../utils/PathTagConverterUtils.java               |   70 -
 .../utils/ShowTimeSeriesResultUtils.java           |   82 --
 .../iotdb/lsm/annotation/DeletionProcessor.java    |   36 -
 .../iotdb/lsm/annotation/InsertionProcessor.java   |   36 -
 .../iotdb/lsm/annotation/QueryProcessor.java       |   36 -
 .../applicationcontext/ApplicationContext.java     |   61 -
 .../ApplicationContextGenerator.java               |  119 --
 .../requestcontext/DeleteRequestContext.java       |   34 -
 .../requestcontext/FlushRequestContext.java        |   33 -
 .../requestcontext/InsertRequestContext.java       |   34 -
 .../requestcontext/QueryRequestContext.java        |   34 -
 .../lsm/context/requestcontext/RequestContext.java |  108 --
 .../org/apache/iotdb/lsm/engine/ILSMEngine.java    |   73 -
 .../org/apache/iotdb/lsm/engine/IRecoverable.java  |   34 -
 .../org/apache/iotdb/lsm/engine/LSMEngine.java     |  176 ---
 .../apache/iotdb/lsm/engine/LSMEngineBuilder.java  |  290 ----
 .../lsm/levelProcess/BasicLevelProcessor.java      |   79 -
 .../lsm/levelProcess/DeleteLevelProcessor.java     |   39 -
 .../lsm/levelProcess/FlushLevelProcessor.java      |   38 -
 .../iotdb/lsm/levelProcess/ILevelProcessor.java    |   41 -
 .../lsm/levelProcess/InsertLevelProcessor.java     |   39 -
 .../lsm/levelProcess/LevelProcessorChain.java      |   45 -
 .../lsm/levelProcess/QueryLevelProcessor.java      |   39 -
 .../apache/iotdb/lsm/manager/BasicLSMManager.java  |   55 -
 .../apache/iotdb/lsm/manager/DeletionManager.java  |   48 -
 .../org/apache/iotdb/lsm/manager/ILSMManager.java  |   58 -
 .../apache/iotdb/lsm/manager/InsertionManager.java |   48 -
 .../org/apache/iotdb/lsm/manager/QueryManager.java |   32 -
 .../apache/iotdb/lsm/manager/RecoverManager.java   |   49 -
 .../org/apache/iotdb/lsm/manager/WALManager.java   |  126 --
 .../apache/iotdb/lsm/request/IDeletionRequest.java |   30 -
 .../iotdb/lsm/request/IInsertionRequest.java       |   30 -
 .../apache/iotdb/lsm/request/IQueryRequest.java    |   35 -
 .../org/apache/iotdb/lsm/request/IRequest.java     |   61 -
 .../org/apache/iotdb/lsm/request/RequestType.java  |   27 -
 .../org/apache/iotdb/lsm/response/IResponse.java   |   46 -
 .../iotdb/lsm/strategy/BFSAccessStrategy.java      |   66 -
 .../apache/iotdb/lsm/strategy/IAccessStrategy.java |   36 -
 .../lsm/strategy/PostOrderAccessStrategy.java      |   56 -
 .../iotdb/lsm/strategy/PreOrderAccessStrategy.java |   54 -
 .../iotdb/lsm/strategy/RBFSAccessStrategy.java     |   85 --
 .../java/org/apache/iotdb/lsm/wal/IWALReader.java  |   48 -
 .../java/org/apache/iotdb/lsm/wal/IWALRecord.java  |   50 -
 .../java/org/apache/iotdb/lsm/wal/IWALWriter.java  |   47 -
 .../java/org/apache/iotdb/lsm/wal/WALReader.java   |   99 --
 .../java/org/apache/iotdb/lsm/wal/WALWriter.java   |  111 --
 .../tagSchemaRegion/TagSchemaRegionTest.java       |  233 ---
 .../idtable/IDTableWithDeviceIDListImplTest.java   |  213 ---
 .../tagIndex/TagTagInvertedIndexTest.java          |  184 ---
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |    2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |    4 +-
 .../apache/iotdb/db/engine/cache/ChunkCache.java   |   11 +-
 .../db/engine/cache/TimeSeriesMetadataCache.java   |    7 +-
 .../execute/task/AbstractCompactionTask.java       |    6 +-
 .../execute/task/CrossSpaceCompactionTask.java     |    4 +-
 .../execute/task/InnerSpaceCompactionTask.java     |    4 +-
 .../readchunk/AlignedSeriesCompactionExecutor.java |    8 +-
 .../readchunk/SingleSeriesCompactionExecutor.java  |   10 +-
 .../utils/writer/AbstractCompactionWriter.java     |    4 +-
 .../schedule/constant/CompactionType.java          |   17 +-
 .../schedule/constant/ProcessChunkType.java        |   17 +-
 .../iotdb/db/engine/flush/FlushManagerMetrics.java |    8 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |    3 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   11 +-
 .../db/engine/storagegroup/DataRegionMetrics.java  |    8 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   21 +-
 .../metadata/cache/DataNodeSchemaCacheMetrics.java |   12 +-
 .../db/metadata/metric/SchemaEngineMemMetric.java  |    5 +-
 .../impl/write/CreateLogicalViewPlanImpl.java      |    6 +-
 .../impl/write/SchemaRegionWritePlanFactory.java   |    2 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |   10 +
 .../iotdb/db/mpp/aggregation/Aggregator.java       |   15 +-
 .../db/mpp/common/header/ColumnHeaderConstant.java |    2 +
 .../mpp/common/schematree/ClusterSchemaTree.java   |   27 +-
 .../db/mpp/common/schematree/ISchemaTree.java      |    7 +
 .../iotdb/db/mpp/execution/driver/DataDriver.java  |    3 +-
 .../iotdb/db/mpp/execution/driver/Driver.java      |    5 +-
 .../execution/exchange/MPPDataExchangeManager.java |   22 +-
 .../exchange/MPPDataExchangeServiceMetrics.java    |    8 -
 ...MppDataExchangeServiceThriftHandlerMetrics.java |   35 +-
 .../execution/exchange/sink/LocalSinkChannel.java  |    7 +-
 .../execution/exchange/sink/ShuffleSinkHandle.java |    8 +-
 .../mpp/execution/exchange/sink/SinkChannel.java   |   15 +-
 .../exchange/source/LocalSourceHandle.java         |   11 +-
 .../execution/exchange/source/SourceHandle.java    |   21 +-
 .../execution/executor/RegionWriteExecutor.java    |    2 +-
 .../fragment/FragmentInstanceManager.java          |    8 +-
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  |   12 +-
 .../execution/operator/source/SeriesScanUtil.java  |    9 +-
 .../db/mpp/execution/schedule/DriverScheduler.java |    9 +-
 .../iotdb/db/mpp/metric/ChunkCacheMetrics.java     |   14 +-
 .../db/mpp/metric/DataExchangeCostMetricSet.java   |  345 +++--
 .../db/mpp/metric/DataExchangeCountMetricSet.java  |  200 ++-
 .../db/mpp/metric/DriverSchedulerMetricSet.java    |   88 +-
 .../db/mpp/metric/QueryExecutionMetricSet.java     |  206 ++-
 .../iotdb/db/mpp/metric/QueryMetricsManager.java   |  122 +-
 .../db/mpp/metric/QueryPlanCostMetricSet.java      |   86 +-
 .../db/mpp/metric/QueryResourceMetricSet.java      |   77 +-
 .../db/mpp/metric/SeriesScanCostMetricSet.java     | 1117 +++++++++-----
 .../mpp/metric/TimeSeriesMetadataCacheMetrics.java |   22 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |   27 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  277 +++-
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |    4 +-
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    |   37 +
 .../db/mpp/plan/analyze/cache/PartitionCache.java  |   52 +-
 .../db/mpp/plan/execution/QueryExecution.java      |   17 +-
 .../config/metadata/ShowVariablesTask.java         |    4 +
 .../iotdb/db/mpp/plan/expression/Expression.java   |   20 +
 .../plan/expression/binary/BinaryExpression.java   |   22 +
 .../plan/expression/binary/WhenThenExpression.java |    7 +
 .../db/mpp/plan/expression/leaf/LeafOperand.java   |    5 +
 .../plan/expression/multi/FunctionExpression.java  |   21 +
 .../expression/other/CaseWhenThenExpression.java   |   17 +
 .../plan/expression/ternary/BetweenExpression.java |    9 +
 .../plan/expression/ternary/TernaryExpression.java |   18 +-
 .../db/mpp/plan/expression/unary/InExpression.java |   26 +-
 .../plan/expression/unary/IsNullExpression.java    |    5 +
 .../mpp/plan/expression/unary/LikeExpression.java  |    5 +
 .../plan/expression/unary/LogicNotExpression.java  |    9 +
 .../plan/expression/unary/NegationExpression.java  |   11 +
 .../plan/expression/unary/RegularExpression.java   |    5 +
 .../mpp/plan/expression/unary/UnaryExpression.java |    6 +-
 .../visitor/CompleteMeasurementSchemaVisitor.java  |  112 ++
 .../RemoveWildcardAndViewInExpressionVisitor.java  |   95 ++
 .../RemoveWildcardAndViewInFilterVisitor.java      |  109 ++
 .../visitor/ReplaceLogicalViewVisitor.java         |  188 +++
 .../db/mpp/plan/parser/StatementGenerator.java     |    2 +-
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |    4 +-
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   |    2 +-
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |    7 +-
 .../scheduler/FixedRateFragInsStateTracker.java    |   14 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   10 +-
 .../plan/scheduler/IFragInstanceStateTracker.java  |    4 +-
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  |   52 +-
 .../metadata/CreateLogicalViewStatement.java       |   26 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |   31 +
 .../query/reader/chunk/DiskAlignedChunkLoader.java |   10 +-
 .../db/query/reader/chunk/DiskChunkLoader.java     |   10 +-
 .../query/reader/chunk/MemAlignedChunkLoader.java  |   10 +-
 .../db/query/reader/chunk/MemChunkLoader.java      |   10 +-
 .../metadata/DiskAlignedChunkMetadataLoader.java   |   11 +-
 .../chunk/metadata/DiskChunkMetadataLoader.java    |   11 +-
 .../metadata/MemAlignedChunkMetadataLoader.java    |   10 +-
 .../chunk/metadata/MemChunkMetadataLoader.java     |    9 +-
 .../service/DataNodeInternalRPCServiceMetrics.java |    8 +-
 .../apache/iotdb/db/service/RPCServiceMetrics.java |    8 +-
 .../iotdb/db/service/metrics/CacheMetrics.java     |  144 ++
 .../db/service/metrics/CompactionMetrics.java      |  396 ++++-
 .../db/service/metrics/DataNodeMetricsHelper.java  |   20 +-
 .../iotdb/db/service/metrics/SystemMetrics.java    |   43 +-
 .../iotdb/db/service/metrics/WritingMetrics.java   |  574 +++++++-
 .../metrics/recorder/CacheMetricsRecorder.java     |   57 -
 .../metrics/recorder/CompactionMetricsManager.java |  240 ---
 .../metrics/recorder/WritingMetricsManager.java    |  351 -----
 .../InternalServiceThriftHandlerMetrics.java       |    8 +-
 .../handler/RPCServiceThriftHandlerMetrics.java    |    8 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |    4 +
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |   17 +-
 .../java/org/apache/iotdb/db/wal/WALManager.java   |    6 +-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |    4 +-
 .../iotdb/db/wal/checkpoint/CheckpointManager.java |    4 +-
 .../iotdb/db/wal/checkpoint/CheckpointType.java    |   16 +-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |    4 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     |    2 +-
 .../mpp/plan/analyze/ExpressionAnalyzerTest.java   |    4 +-
 872 files changed, 5322 insertions(+), 36649 deletions(-)



[iotdb] 02/05: Merge branch 'master' of github.com:apache/iotdb into pipe-wal-resource-management

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 865f8914ce6cc81f853f25d521019128253c4069
Merge: 31fa754dcb6 c3ebb19b027
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 23 23:00:37 2023 +0800

    Merge branch 'master' of github.com:apache/iotdb into pipe-wal-resource-management

 docs/UserGuide/Monitor-Alert/Metric-Tool.md        |  51 +++++--
 docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md     |  66 ++++++---
 .../iotdb/commons/service/metric/enums/Metric.java |   5 +
 .../tools/tsfile/print-iotdb-data-dir.bat          |   2 +-
 .../resources/tools/tsfile/print-iotdb-data-dir.sh |   2 +-
 .../impl/SchemaRegionPlanDeserializer.java         |   8 +-
 .../impl/SchemaRegionPlanSerializer.java           |  12 +-
 .../execution/exchange/MPPDataExchangeManager.java |  13 ++
 .../fragment/FragmentInstanceManager.java          |   8 +
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  |  16 ++
 .../db/mpp/execution/schedule/DriverScheduler.java |   8 +
 .../db/mpp/metric/DataExchangeCountMetricSet.java  |  25 ++++
 .../db/mpp/metric/DriverSchedulerMetricSet.java    |  18 +++
 .../mpp/metric/QueryRelatedResourceMetricSet.java  | 162 +++++++++++++++++++++
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   4 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   3 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   4 +
 .../db/service/metrics/DataNodeMetricsHelper.java  |   2 +
 .../apache/iotdb/db/tools/IoTDBDataDirViewer.java  |  41 ++++--
 19 files changed, 396 insertions(+), 54 deletions(-)


[iotdb] 04/05: bind WALPipeHandle

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 77ce31eb61df69bd4c5dadb260df4c6aca16aaea
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 25 19:57:07 2023 +0800

    bind WALPipeHandle
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 10 +++-
 .../listener/PipeInsertionDataNodeListener.java    | 10 ++--
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  3 +-
 .../core/event/impl/PipeTabletInsertionEvent.java  | 53 +++++++++++++++-------
 .../realtime/PipeRealtimeCollectEventFactory.java  |  5 +-
 .../db/pipe/resource/wal/PipeWALResource.java      |  4 ++
 .../pipe/resource/wal/PipeWALResourceManager.java  | 11 +++++
 .../apache/iotdb/db/wal/utils/WALPipeHandler.java  | 14 ++++++
 .../core/collector/PipeRealtimeCollectTest.java    |  6 +++
 9 files changed, 91 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a0a36e3a1c0..6eceea951d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -277,7 +277,10 @@ public class TsFileProcessor {
 
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
+            dataRegionInfo.getDataRegion().getDataRegionId(),
+            walFlushListener.getWalPipeHandler(),
+            insertRowNode,
+            tsFileResource);
 
     if (insertRowNode.isAligned()) {
       workMemTable.insertAlignedRow(insertRowNode);
@@ -377,7 +380,10 @@ public class TsFileProcessor {
 
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
+            dataRegionInfo.getDataRegion().getDataRegionId(),
+            walFlushListener.getWalPipeHandler(),
+            insertTabletNode,
+            tsFileResource);
 
     try {
       if (insertTabletNode.isAligned()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 778a583f1c7..f2298310544 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -84,9 +85,11 @@ public class PipeInsertionDataNodeListener {
     assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
   }
 
-  // TODO: check whether the method is called on the right place.
   public void listenToInsertNode(
-      String dataRegionId, InsertNode insertNode, TsFileResource tsFileResource) {
+      String dataRegionId,
+      WALPipeHandler walPipeHandler,
+      InsertNode insertNode,
+      TsFileResource tsFileResource) {
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
 
     // only events from registered data region will be collected
@@ -95,7 +98,8 @@ public class PipeInsertionDataNodeListener {
     }
 
     assigner.publishToAssign(
-        PipeRealtimeCollectEventFactory.createCollectEvent(insertNode, tsFileResource));
+        PipeRealtimeCollectEventFactory.createCollectEvent(
+            walPipeHandler, insertNode, tsFileResource));
   }
 
   /////////////////////////////// singleton ///////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 6b414c03b36..65837f31d42 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -130,7 +131,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
   }
 
   private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
-      throws PipeException, TException {
+      throws PipeException, TException, WALPipeException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 62c599b66c1..476f9e6f12a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -21,28 +21,32 @@ package org.apache.iotdb.db.pipe.core.event.impl;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
 public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent {
 
-  private final InsertNode insertNode;
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
 
-  private final AtomicInteger referenceCount;
+  private final WALPipeHandler walPipeHandler;
 
-  public PipeTabletInsertionEvent(InsertNode insertNode) {
-    this.insertNode = insertNode;
-    this.referenceCount = new AtomicInteger(0);
+  public PipeTabletInsertionEvent(WALPipeHandler walPipeHandler) {
+    this.walPipeHandler = walPipeHandler;
   }
 
-  public InsertNode getInsertNode() {
-    return insertNode;
+  public InsertNode getInsertNode() throws WALPipeException {
+    return walPipeHandler.getValue();
   }
 
   @Override
@@ -62,26 +66,41 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE
 
   @Override
   public boolean increaseReferenceCount(String holderMessage) {
-    // TODO: use WALPipeHandler pinMemtable
-    referenceCount.incrementAndGet();
-    return true;
+    try {
+      PipeResourceManager.wal().pin(walPipeHandler.getMemTableId(), walPipeHandler);
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Increase reference count for memtable %d error. Holder Message: %s",
+              walPipeHandler.getMemTableId(), holderMessage),
+          e);
+      return false;
+    }
   }
 
   @Override
   public boolean decreaseReferenceCount(String holderMessage) {
-    // TODO: use WALPipeHandler unpinMemetable
-    referenceCount.decrementAndGet();
-    return true;
+    try {
+      PipeResourceManager.wal().unpin(walPipeHandler.getMemTableId());
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Decrease reference count for memtable %d error. Holder Message: %s",
+              walPipeHandler.getMemTableId(), holderMessage),
+          e);
+      return false;
+    }
   }
 
   @Override
   public int getReferenceCount() {
-    // TODO: use WALPipeHandler unpinMemetable
-    return referenceCount.get();
+    return PipeResourceManager.wal().getReferenceCount(walPipeHandler.getMemTableId());
   }
 
   @Override
   public String toString() {
-    return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
+    return "PipeTabletInsertionEvent{" + "walPipeHandler=" + walPipeHandler + '}';
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 4c98c5193be..a0961624dbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 
 public class PipeRealtimeCollectEventFactory {
 
@@ -34,9 +35,9 @@ public class PipeRealtimeCollectEventFactory {
   }
 
   public static PipeRealtimeCollectEvent createCollectEvent(
-      InsertNode node, TsFileResource resource) {
+      WALPipeHandler walPipeHandler, InsertNode insertNode, TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
-        new PipeTabletInsertionEvent(node), node, resource);
+        new PipeTabletInsertionEvent(walPipeHandler), insertNode, resource);
   }
 
   private PipeRealtimeCollectEventFactory() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 844420272bd..8d594629bf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -155,4 +155,8 @@ public class PipeWALResource implements AutoCloseable {
 
     referenceCount.set(0);
   }
+
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index c187a29f781..18b942496cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -95,4 +95,15 @@ public class PipeWALResourceManager implements AutoCloseable {
       }
     }
   }
+
+  public int getReferenceCount(long memtableId) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      return memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount();
+    } finally {
+      lock.unlock();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
index 80333cf8ffb..3392731f148 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
@@ -133,4 +133,18 @@ public class WALPipeHandler {
   public void setSize(int size) {
     this.walEntryPosition.setSize(size);
   }
+
+  @Override
+  public String toString() {
+    return "WALPipeHandler{"
+        + "memTableId="
+        + memTableId
+        + ", value="
+        + value
+        + ", walEntryPosition="
+        + walEntryPosition
+        + ", walNode="
+        + walNode
+        + '}';
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 7cd705af588..d18fc026678 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCo
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -57,6 +58,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
+import static org.mockito.Mockito.mock;
+
 public class PipeRealtimeCollectTest {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
@@ -229,6 +232,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
+
     File dataRegionDir =
         new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
     boolean ignored = dataRegionDir.mkdirs();
@@ -249,6 +253,7 @@ public class PipeRealtimeCollectTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
+                    mock(WALPipeHandler.class),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
@@ -262,6 +267,7 @@ public class PipeRealtimeCollectTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
+                    mock(WALPipeHandler.class),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),


[iotdb] 03/05: wal resource manager

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8bedf08b00be5bd826ae35996145b4b790875b3f
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 25 02:31:42 2023 +0800

    wal resource manager
---
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  27 ++--
 .../db/pipe/resource/PipeResourceManager.java      |   9 ++
 .../{ => file}/PipeFileResourceManager.java        |   2 +-
 .../db/pipe/resource/wal/PipeWALResource.java      | 158 +++++++++++++++++++++
 .../pipe/resource/wal/PipeWALResourceManager.java  |  98 +++++++++++++
 .../apache/iotdb/db/wal/utils/WALPipeHandler.java  |   4 +
 .../pipe/resource/PipeFileResourceManagerTest.java |   1 +
 8 files changed, 286 insertions(+), 14 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index d156287e1f3..718c5a6bfc1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -72,6 +72,7 @@ public enum ThreadName {
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
   PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
+  PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"),
   ;
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f4f33c10473..a0a36e3a1c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -254,8 +254,9 @@ public class TsFileProcessor {
     }
 
     long startTime = System.nanoTime();
+    WALFlushListener walFlushListener;
     try {
-      WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode);
+      walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode);
       if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
         throw walFlushListener.getCause();
       }
@@ -273,17 +274,17 @@ public class TsFileProcessor {
     }
 
     startTime = System.nanoTime();
+
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
+
     if (insertRowNode.isAligned()) {
       workMemTable.insertAlignedRow(insertRowNode);
     } else {
       workMemTable.insert(insertRowNode);
     }
 
-    // collect plan node in pipe
-    PipeInsertionDataNodeListener.getInstance()
-        .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
-
     // update start time of this memtable
     tsFileResource.updateStartTime(
         insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
@@ -354,9 +355,9 @@ public class TsFileProcessor {
     }
 
     long startTime = System.nanoTime();
+    WALFlushListener walFlushListener;
     try {
-      WALFlushListener walFlushListener =
-          walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end);
+      walFlushListener = walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end);
       if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
         throw walFlushListener.getCause();
       }
@@ -373,6 +374,11 @@ public class TsFileProcessor {
     }
 
     startTime = System.nanoTime();
+
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
+
     try {
       if (insertTabletNode.isAligned()) {
         workMemTable.insertAlignedTablet(insertTabletNode, start, end);
@@ -389,11 +395,6 @@ public class TsFileProcessor {
       results[i] = RpcUtils.SUCCESS_STATUS;
     }
 
-    // collect plan node in pipe
-    PipeInsertionDataNodeListener.getInstance()
-        .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
-
     tsFileResource.updateStartTime(
         insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[start]);
     // for sequence tsfile, we update the endTime only when the file is prepared to be closed.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 61b4e61a04e..43bddd872f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -19,18 +19,27 @@
 
 package org.apache.iotdb.db.pipe.resource;
 
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+
 public class PipeResourceManager {
 
   private final PipeFileResourceManager pipeFileResourceManager;
+  private final PipeWALResourceManager pipeWALResourceManager;
 
   public static PipeFileResourceManager file() {
     return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
   }
 
+  public static PipeWALResourceManager wal() {
+    return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
+  }
+
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeResourceManager() {
     pipeFileResourceManager = new PipeFileResourceManager();
+    pipeWALResourceManager = new PipeWALResourceManager();
   }
 
   private static class PipeResourceManagerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index e7d961b3c9f..942ab600536 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource;
+package org.apache.iotdb.db.pipe.resource.file;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.FileUtils;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
new file mode 100644
index 00000000000..844420272bd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal;
+
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeWALResource implements AutoCloseable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResource.class);
+
+  private final WALPipeHandler walPipeHandler;
+
+  private final AtomicInteger referenceCount;
+
+  // TODO: make this configurable
+  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60;
+  private final AtomicLong lastLogicalPinTime;
+  private final AtomicBoolean isPhysicallyPinned;
+
+  public PipeWALResource(WALPipeHandler walPipeHandler) {
+    this.walPipeHandler = walPipeHandler;
+
+    referenceCount = new AtomicInteger(0);
+
+    lastLogicalPinTime = new AtomicLong(0);
+    isPhysicallyPinned = new AtomicBoolean(false);
+  }
+
+  public void pin() throws PipeRuntimeNonCriticalException {
+    if (referenceCount.get() == 0) {
+      if (!isPhysicallyPinned.get()) {
+        try {
+          walPipeHandler.pinMemTable();
+        } catch (MemTablePinException e) {
+          throw new PipeRuntimeNonCriticalException(
+              String.format(
+                  "failed to pin wal %d, because %s",
+                  walPipeHandler.getMemTableId(), e.getMessage()));
+        }
+        isPhysicallyPinned.set(true);
+        LOGGER.info("wal {} is pinned by pipe engine", walPipeHandler.getMemTableId());
+      } // else means the wal is already pinned, do nothing
+
+      // no matter the wal is pinned or not, update the last pin time
+      lastLogicalPinTime.set(System.currentTimeMillis());
+    }
+
+    referenceCount.incrementAndGet();
+  }
+
+  public void unpin() throws PipeRuntimeNonCriticalException {
+    final int finalReferenceCount = referenceCount.get();
+
+    if (finalReferenceCount == 1) {
+      unpinPhysicallyIfOutOfTimeToLive();
+    } else if (finalReferenceCount < 1) {
+      throw new PipeRuntimeCriticalException(
+          String.format(
+              "wal %d is unpinned more than pinned, this should not happen",
+              walPipeHandler.getMemTableId()));
+    }
+
+    referenceCount.decrementAndGet();
+  }
+
+  /**
+   * Invalidate the wal if it is unpinned and out of time to live.
+   *
+   * @return true if the wal is invalidated, false otherwise
+   */
+  public boolean invalidateIfPossible() {
+    if (referenceCount.get() > 0) {
+      return false;
+    }
+
+    // referenceCount.get() == 0
+    return unpinPhysicallyIfOutOfTimeToLive();
+  }
+
+  /**
+   * Unpin the wal if it is out of time to live.
+   *
+   * @return true if the wal is unpinned physically (then it can be invalidated), false otherwise
+   */
+  private boolean unpinPhysicallyIfOutOfTimeToLive() {
+    if (isPhysicallyPinned.get()) {
+      if (System.currentTimeMillis() - lastLogicalPinTime.get() > MIN_TIME_TO_LIVE_IN_MS) {
+        try {
+          walPipeHandler.unpinMemTable();
+        } catch (MemTablePinException e) {
+          throw new PipeRuntimeNonCriticalException(
+              String.format(
+                  "failed to unpin wal %d, because %s",
+                  walPipeHandler.getMemTableId(), e.getMessage()));
+        }
+        isPhysicallyPinned.set(false);
+        LOGGER.info(
+            "wal {} is unpinned by pipe engine when checking time to live",
+            walPipeHandler.getMemTableId());
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      LOGGER.info(
+          "wal {} is not pinned physically when checking time to live",
+          walPipeHandler.getMemTableId());
+      return true;
+    }
+  }
+
+  @Override
+  public void close() {
+    if (isPhysicallyPinned.get()) {
+      try {
+        walPipeHandler.unpinMemTable();
+      } catch (MemTablePinException e) {
+        LOGGER.error(
+            "failed to unpin wal {} when closing pipe wal resource, because {}",
+            walPipeHandler.getMemTableId(),
+            e.getMessage());
+      }
+      isPhysicallyPinned.set(false);
+      LOGGER.info(
+          "wal {} is unpinned by pipe engine when closing pipe wal resource",
+          walPipeHandler.getMemTableId());
+    }
+
+    referenceCount.set(0);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
new file mode 100644
index 00000000000..c187a29f781
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -0,0 +1,98 @@
+/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless r [...]
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeWALResourceManager implements AutoCloseable {
+
+  private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
+
+  private static final int SEGMENT_LOCK_COUNT = 32;
+  private final ReentrantLock[] memtableIdSegmentLocks;
+
+  private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE.getName());
+  private final ScheduledFuture<?> ttlCheckerFuture;
+
+  public PipeWALResourceManager() {
+    memtableIdToPipeWALResourceMap = new HashMap<>();
+
+    memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
+    for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
+      memtableIdSegmentLocks[i] = new ReentrantLock();
+    }
+
+    ttlCheckerFuture =
+        PIPE_WAL_RESOURCE_TTL_CHECKER.scheduleAtFixedRate(
+            () -> {
+              for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
+                final ReentrantLock lock =
+                    memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+                lock.lock();
+                try {
+                  if (memtableIdToPipeWALResourceMap.get(memtableId).invalidateIfPossible()) {
+                    memtableIdToPipeWALResourceMap.remove(memtableId);
+                  }
+                } finally {
+                  lock.unlock();
+                }
+              }
+            },
+            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+            TimeUnit.MILLISECONDS);
+  }
+
+  public void pin(long memtableId, WALPipeHandler walPipeHandler) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      memtableIdToPipeWALResourceMap
+          .computeIfAbsent(memtableId, id -> new PipeWALResource(walPipeHandler))
+          .pin();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void unpin(long memtableId) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (ttlCheckerFuture != null) {
+      ttlCheckerFuture.cancel(true);
+    }
+
+    for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
+      final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+      lock.lock();
+      try {
+        memtableIdToPipeWALResourceMap.get(memtableId).close();
+        memtableIdToPipeWALResourceMap.remove(memtableId);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
index abdb4771a93..80333cf8ffb 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
@@ -101,6 +101,10 @@ public class WALPipeHandler {
     }
   }
 
+  public long getMemTableId() {
+    return memTableId;
+  }
+
   public void setMemTableId(long memTableId) {
     this.memTableId = memTableId;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
index b2441aa9d9f..ef86b0db285 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.TsFileWriter;


[iotdb] 01/05: reduce heartbeat frequency

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 31fa754dcb66f32edc9a23e2bb4b4c1d7888d1eb
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 23 17:24:02 2023 +0800

    reduce heartbeat frequency
---
 .../apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java  | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 742b2230fa6..df0ea7d1fcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -39,6 +39,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
   private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
   private final PipeConnector outputPipeConnector;
 
+  private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
+  private int executeOnceInvokedTimes;
+
   /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(
       String taskID,
@@ -47,13 +50,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
     super(taskID);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
+    executeOnceInvokedTimes = 0;
   }
 
   @Override
   protected synchronized boolean executeOnce() {
     try {
-      // TODO: reduce the frequency of heartbeat
-      outputPipeConnector.heartbeat();
+      if (executeOnceInvokedTimes++ % HEARTBEAT_CHECK_INTERVAL == 0) {
+        outputPipeConnector.heartbeat();
+      }
     } catch (Exception e) {
       throw new PipeConnectionException(
           "PipeConnector: failed to connect to the target system.", e);