You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/12 08:55:02 UTC
[iotdb] branch master updated: [IOTDB-4674] Reimplement settle by compaction (#8644)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9d5d78285d [IOTDB-4674] Reimplement settle by compaction (#8644)
9d5d78285d is described below
commit 9d5d78285d3178200cc916c7d1eca22f8b4fc08d
Author: shuwenwei <55...@users.noreply.github.com>
AuthorDate: Thu Jan 12 16:54:54 2023 +0800
[IOTDB-4674] Reimplement settle by compaction (#8644)
---
.../resources/tools/tsfile/settle-tsfile.bat | 62 ++++
.../resources/tools/tsfile/settle-tsfile.sh | 48 +++
.../db/engine/settle/SettleRequestHandler.java | 323 +++++++++++++++++++++
.../impl/DataNodeInternalRPCServiceImpl.java | 7 +
.../tools/settle/TsFileSettleByCompactionTool.java | 123 ++++++++
.../db/engine/settle/SettleRequestHandlerTest.java | 162 +++++++++++
thrift-commons/src/main/thrift/common.thrift | 4 +
thrift/src/main/thrift/datanode.thrift | 2 +
8 files changed, 731 insertions(+)
diff --git a/server/src/assembly/resources/tools/tsfile/settle-tsfile.bat b/server/src/assembly/resources/tools/tsfile/settle-tsfile.bat
new file mode 100644
index 0000000000..1e4e9f744c
--- /dev/null
+++ b/server/src/assembly/resources/tools/tsfile/settle-tsfile.bat
@@ -0,0 +1,62 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM
+
+
+@echo off
+echo ````````````````````````````````````````````
+echo Starting Settling the TsFile By Compaction
+echo ````````````````````````````````````````````
+
+if "%OS%" == "Windows_NT" setlocal
+
+pushd %~dp0..\..
+if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
+popd
+
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.settle.TsFileSettleByCompactionTool
+if NOT DEFINED JAVA_HOME goto :err
+
+@REM -----------------------------------------------------------------------------
+@REM ***** CLASSPATH library setting *****
+@REM Ensure that any user defined CLASSPATH variables are not used on startup
+set CLASSPATH="%IOTDB_HOME%\lib\*"
+
+goto okClasspath
+
+:append
+set CLASSPATH=%CLASSPATH%;%1
+goto :eof
+
+@REM -----------------------------------------------------------------------------
+:okClasspath
+
+"%JAVA_HOME%\bin\java" -cp "%CLASSPATH%" %MAIN_CLASS% %*
+
+goto finally
+
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+
+@REM -----------------------------------------------------------------------------
+:finally
+
+ENDLOCAL
diff --git a/server/src/assembly/resources/tools/tsfile/settle-tsfile.sh b/server/src/assembly/resources/tools/tsfile/settle-tsfile.sh
new file mode 100644
index 0000000000..384a82cf94
--- /dev/null
+++ b/server/src/assembly/resources/tools/tsfile/settle-tsfile.sh
@@ -0,0 +1,48 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+echo -----------------------------------------------
+echo Starting Settling the TsFile By Compaction
+echo -----------------------------------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+ for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+ if [ -x "$java" ]; then
+ JAVA="$java"
+ break
+ fi
+ done
+else
+ JAVA=java
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+
+MAIN_CLASS=org.apache.iotdb.db.tools.settle.TsFileSettleByCompactionTool
+
+"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
+exit $?
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleRequestHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleRequestHandler.java
new file mode 100644
index 0000000000..7b32559532
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleRequestHandler.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.settle;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SettleRequestHandler {
+ private static final Logger logger = LoggerFactory.getLogger(SettleRequestHandler.class);
+
+ private boolean testMode = false;
+
+ public boolean isTestMode() {
+ return testMode;
+ }
+
+ public void setTestMode(boolean testMode) {
+ this.testMode = testMode;
+ }
+
+ public static SettleRequestHandler getInstance() {
+ return SettleRequestHandlerHolder.INSTANCE;
+ }
+
+ public TSStatus handleSettleRequest(TSettleReq req) {
+ List<String> paths = req.getPaths();
+
+ SettleRequestContext context = new SettleRequestContext(paths);
+ TSStatus validationResult = context.validate();
+ if (validationResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return validationResult;
+ }
+ if (testMode) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+ List<TsFileResource> selectedTsFileResources = context.getTsFileResourcesByFileNames();
+ return context.submitCompactionTask(selectedTsFileResources);
+ }
+
+ private static class SettleRequestContext {
+ private ConsistentSettleInfo targetConsistentSettleInfo;
+
+ private boolean hasSeqFiles;
+ private boolean hasUnseqFiles;
+ private boolean hasModsFiles;
+ private List<String> paths;
+ private Set<String> tsFileNames;
+ private TsFileResourceList allTsFileResourceList;
+
+ private TsFileManager tsFileManager;
+ private IoTDBConfig config;
+
+ private SettleRequestContext(List<String> paths) {
+ this.paths = paths;
+ this.tsFileNames = new HashSet<>();
+ this.config = IoTDBDescriptor.getInstance().getConfig();
+ }
+
+ private TSStatus validate() {
+ if (paths == null || paths.size() == 0) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PARAMETER, "The files to settle is not offered.");
+ }
+
+ int maxInnerCompactionCandidateFileNum = config.getMaxInnerCompactionCandidateFileNum();
+ if (paths.size() > maxInnerCompactionCandidateFileNum) {
+ return RpcUtils.getStatus(
+ TSStatusCode.UNSUPPORTED_OPERATION,
+ "Too many files offered, the limited count of system config is "
+ + maxInnerCompactionCandidateFileNum
+ + ", the input file count is "
+ + tsFileNames.size());
+ }
+
+ TSStatus validationResult;
+
+ for (String path : paths) {
+ File currentTsFile = new File(path);
+ if (!currentTsFile.exists()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PATH_NOT_EXIST, "The specified file does not exist in " + path);
+ }
+ File modsFile = new File(path + ModificationFile.FILE_SUFFIX);
+ hasModsFiles |= modsFile.exists();
+
+ ConsistentSettleInfo currentInfo = calculateConsistentInfo(currentTsFile);
+ if (!currentInfo.isValid) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PATH, "The File Name of the TsFile is not valid: " + path);
+ }
+ if (this.targetConsistentSettleInfo == null) {
+ this.targetConsistentSettleInfo = currentInfo;
+ }
+
+ validationResult = targetConsistentSettleInfo.checkConsistency(currentInfo);
+ if (!isSuccess(validationResult)) {
+ return validationResult;
+ }
+
+ if (TsFileUtils.isSequence(currentTsFile)) {
+ hasSeqFiles = true;
+ } else {
+ hasUnseqFiles = true;
+ }
+ tsFileNames.add(currentTsFile.getName());
+ if (hasSeqFiles && hasUnseqFiles) {
+ return RpcUtils.getStatus(
+ TSStatusCode.UNSUPPORTED_OPERATION, "Settle by cross compaction is not allowed.");
+ }
+ }
+
+ if (!hasModsFiles) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PARAMETER,
+ "Every selected TsFile does not contains the mods file.");
+ }
+ DataRegion dataRegion =
+ StorageEngine.getInstance()
+ .getDataRegion(new DataRegionId(targetConsistentSettleInfo.dataRegionId));
+ if (dataRegion == null) {
+ return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PATH, "DataRegion not exist");
+ }
+ tsFileManager = dataRegion.getTsFileManager();
+
+ validationResult = checkCompactionConfigs();
+ if (!isSuccess(validationResult)) {
+ return validationResult;
+ }
+
+ if (hasSeqFiles) {
+ allTsFileResourceList =
+ tsFileManager.getSequenceListByTimePartition(
+ targetConsistentSettleInfo.timePartitionId);
+ } else {
+ allTsFileResourceList =
+ tsFileManager.getUnsequenceListByTimePartition(
+ targetConsistentSettleInfo.timePartitionId);
+ }
+
+ return validateTsFileResources();
+ }
+
+ private ConsistentSettleInfo calculateConsistentInfo(File tsFile) {
+ ConsistentSettleInfo values = new ConsistentSettleInfo();
+ values.dataRegionId = TsFileUtils.getDataRegionId(tsFile);
+ values.storageGroupName = TsFileUtils.getStorageGroup(tsFile);
+ values.timePartitionId = TsFileUtils.getTimePartition(tsFile);
+ values.isValid = true;
+
+ String fileNameStr = tsFile.getName();
+ TsFileNameGenerator.TsFileName tsFileName;
+ try {
+ tsFileName = TsFileNameGenerator.getTsFileName(fileNameStr);
+ } catch (IOException e) {
+ values.isValid = false;
+ return values;
+ }
+ values.level = tsFileName.getInnerCompactionCnt();
+ return values;
+ }
+
+ private boolean isSuccess(TSStatus status) {
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ private TSStatus checkCompactionConfigs() {
+ if (!tsFileManager.isAllowCompaction()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in this DataRegion is not allowed.");
+ }
+ if (hasSeqFiles && !config.isEnableSeqSpaceCompaction()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in Seq Space is not enabled");
+ }
+ if (hasUnseqFiles && !config.isEnableUnseqSpaceCompaction()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in Unseq Space is not enabled");
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ private TSStatus validateTsFileResources() {
+ int continuousCount = 0;
+ for (TsFileResource tsFileResource : allTsFileResourceList) {
+ File tsFile = tsFileResource.getTsFile();
+ if (tsFileNames.contains(tsFile.getName())) {
+ if (tsFileResource.getStatus() != TsFileResourceStatus.CLOSED) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PARAMETER,
+ "The TsFile is not valid: " + tsFile.getAbsolutePath());
+ }
+ continuousCount++;
+ continue;
+ }
+ if (continuousCount != 0) {
+ break;
+ }
+ }
+ if (continuousCount != tsFileNames.size()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PARAMETER, "Selected TsFiles are not continuous.");
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ private List<TsFileResource> getTsFileResourcesByFileNames() {
+ List<TsFileResource> selectedTsFileResources = new ArrayList<>(tsFileNames.size());
+ for (TsFileResource tsFileResource : allTsFileResourceList) {
+ if (tsFileNames.contains(tsFileResource.getTsFile().getName())) {
+ selectedTsFileResources.add(tsFileResource);
+ }
+ if (selectedTsFileResources.size() == tsFileNames.size()) {
+ break;
+ }
+ }
+ return selectedTsFileResources;
+ }
+
+ private TSStatus submitCompactionTask(List<TsFileResource> tsFileResources) {
+ ICompactionPerformer performer =
+ hasSeqFiles
+ ? config.getInnerSeqCompactionPerformer().createInstance()
+ : config.getInnerUnseqCompactionPerformer().createInstance();
+ AbstractCompactionTask task =
+ new InnerSpaceCompactionTask(
+ targetConsistentSettleInfo.timePartitionId,
+ tsFileManager,
+ tsFileResources,
+ hasSeqFiles,
+ performer,
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId());
+ try {
+ CompactionTaskManager.getInstance().addTaskToWaitingQueue(task);
+ } catch (InterruptedException e) {
+ logger.error(
+ "meet error when adding task-{} to compaction waiting queue: {}",
+ task.getSerialId(),
+ e.getMessage());
+ return RpcUtils.getStatus(
+ TSStatusCode.COMPACTION_ERROR, "meet error when submit settle task.");
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+ }
+
+ private static class ConsistentSettleInfo {
+ private int dataRegionId;
+ private int level;
+ private String storageGroupName;
+ private long timePartitionId;
+ private boolean isValid;
+
+ private TSStatus checkConsistency(ConsistentSettleInfo other) {
+ if (this.dataRegionId != other.dataRegionId) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PATH, "DataRegion of files is not consistent.");
+ }
+ if (!this.storageGroupName.equals(other.storageGroupName)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PATH, "StorageGroup of files is not consistent.");
+ }
+ if (this.timePartitionId != other.timePartitionId) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PATH, "TimePartition of files is not consistent.");
+ }
+ if (this.level != other.level) {
+ return RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PARAMETER, "Level of files is not consistent.");
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+ }
+
+ private static class SettleRequestHandlerHolder {
+ private static final SettleRequestHandler INSTANCE = new SettleRequestHandler();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 956b4be9f2..a44774faa4 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -59,6 +60,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.settle.SettleRequestHandler;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
@@ -1112,6 +1114,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ @Override
+ public TSStatus settle(TSettleReq req) throws TException {
+ return SettleRequestHandler.getInstance().handleSettleRequest(req);
+ }
+
@Override
public TSStatus loadConfiguration() throws TException {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileSettleByCompactionTool.java b/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileSettleByCompactionTool.java
new file mode 100644
index 0000000000..51eccb2ed2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileSettleByCompactionTool.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.settle;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.layered.TFramedTransport;
+
+import java.util.Arrays;
+
+public class TsFileSettleByCompactionTool {
+
+ private static final String HOST_ARGS = "h";
+ private static final String HOST_NAME = "host";
+
+ private static final String PORT_ARGS = "p";
+ private static final String PORT_NAME = "port";
+
+ private static final String FILE_PATH_ARGS = "f";
+ private static final String FILE_PATH_NAME = "file paths";
+
+ private static final String DEFAULT_HOST_VALUE = "127.0.0.1";
+ private static final String DEFAULT_PORT_VALUE = "9003";
+
+ public static void main(String[] args) throws TException {
+ String[] filePaths;
+
+ Options commandLineOptions = createOptions();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine;
+ try {
+ commandLine = parser.parse(commandLineOptions, args);
+ } catch (ParseException e) {
+ System.out.println("Parse command line args failed: " + e.getMessage());
+ return;
+ }
+
+ String hostValue = getArgOrDefault(commandLine, HOST_ARGS, DEFAULT_HOST_VALUE);
+ String portValue = getArgOrDefault(commandLine, PORT_ARGS, DEFAULT_PORT_VALUE);
+ int port = Integer.parseInt(portValue);
+ filePaths = commandLine.getOptionValues(FILE_PATH_ARGS);
+
+ TTransport transport = new TFramedTransport(new TSocket(hostValue, port));
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(transport);
+ IDataNodeRPCService.Client.Factory clientFactory = new IDataNodeRPCService.Client.Factory();
+ IDataNodeRPCService.Client client = clientFactory.getClient(protocol);
+
+ TSettleReq tSettleReq = new TSettleReq();
+ tSettleReq.setPaths(Arrays.asList(filePaths));
+ TSStatus result = client.settle(tSettleReq);
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ System.out.println("Add Settle Compaction Task Successfully");
+ } else {
+ System.out.println("Add settle compaction task failed with status code: " + result);
+ }
+ }
+
+ private static Options createOptions() {
+ Options options = new Options();
+ Option host =
+ Option.builder(HOST_ARGS)
+ .argName(HOST_NAME)
+ .hasArg()
+ .desc("Host Name (optional, default 127.0.0.1")
+ .build();
+ options.addOption(host);
+
+ Option port =
+ Option.builder(PORT_ARGS)
+ .argName(PORT_NAME)
+ .hasArg()
+ .desc("Port (optional, default 9003)")
+ .build();
+ options.addOption(port);
+
+ Option filePaths =
+ Option.builder(FILE_PATH_ARGS)
+ .argName(FILE_PATH_NAME)
+ .hasArgs()
+ .valueSeparator(' ')
+ .desc("File Paths (required)")
+ .required()
+ .build();
+ options.addOption(filePaths);
+ return options;
+ }
+
+ private static String getArgOrDefault(CommandLine commandLine, String arg, String defaultValue) {
+ String value = commandLine.getOptionValue(arg);
+ return value == null ? defaultValue : value;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/settle/SettleRequestHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/settle/SettleRequestHandlerTest.java
new file mode 100644
index 0000000000..f849ac9f63
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/settle/SettleRequestHandlerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.settle;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.DataRegionTest;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.WALManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SettleRequestHandlerTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private SettleRequestHandler reqHandler;
+ private String storageGroup = "root.sg.d1";
+ private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+ private String deviceId = "root.sg.d1";
+ private String measurementId = "s0";
+ private List<String> paths = new ArrayList<>();
+ private DataRegion dataRegion;
+
+ @Before
+ public void setUp()
+ throws DataRegionException, StartupException, IOException, StorageEngineException {
+ EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.envSetUp();
+ reqHandler = SettleRequestHandler.getInstance();
+ reqHandler.setTestMode(true);
+ dataRegion = new DummyDataRegion(systemDir, storageGroup);
+ StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion);
+ WALManager.getInstance().start();
+ FlushManager.getInstance().start();
+ }
+
+ @After
+ public void tearDown() throws StorageEngineException, IOException {
+ WALManager.getInstance().stop();
+ FlushManager.getInstance().stop();
+ if (dataRegion != null) {
+ dataRegion.syncDeleteDataFiles();
+ StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
+ }
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testHandleSettleRequest()
+ throws IllegalPathException, IOException, WriteProcessException {
+ createTsFiles();
+ Assert.assertEquals(3, paths.size());
+
+ // 3 TsFile compaction
+ TSettleReq req = new TSettleReq();
+ req.setPaths(paths);
+ TSStatus result = reqHandler.handleSettleRequest(req);
+ Assert.assertEquals(result.code, TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+ // not enable compaction
+ config.setEnableSeqSpaceCompaction(false);
+ result = reqHandler.handleSettleRequest(req);
+ Assert.assertEquals(result.code, TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode());
+ config.setEnableSeqSpaceCompaction(true);
+
+ // compaction candidate file num
+ int maxInnerCompactionCandidateFileNum = config.getMaxInnerCompactionCandidateFileNum();
+ config.setMaxInnerCompactionCandidateFileNum(2);
+ result = reqHandler.handleSettleRequest(req);
+ Assert.assertEquals(result.code, TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode());
+ String firstTsFilePath = paths.remove(0);
+ result = reqHandler.handleSettleRequest(req);
+ Assert.assertEquals(result.code, TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ paths.add(0, firstTsFilePath);
+ config.setMaxInnerCompactionCandidateFileNum(maxInnerCompactionCandidateFileNum);
+
+ // not continuous
+ paths.remove(1);
+ result = reqHandler.handleSettleRequest(req);
+ Assert.assertEquals(result.code, TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
+
+ // mods file not exist
+ paths.remove(0);
+ result = reqHandler.handleSettleRequest(req);
+ Assert.assertEquals(result.code, TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
+ }
+
+ private void createTsFiles() throws IllegalPathException, WriteProcessException, IOException {
+ TSRecord record;
+ for (int i = 0; i < 3; i++) {
+ for (int j = 1; j <= 3; j++) {
+ long timestamp = 3L * i + j;
+ record = new TSRecord(timestamp, deviceId);
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(timestamp)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ }
+ for (TsFileProcessor tsFileProcessor : dataRegion.getWorkSequenceTsFileProcessors()) {
+ paths.add(tsFileProcessor.getTsFileResource().getTsFilePath());
+ tsFileProcessor.syncFlush();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ if (i != 2) {
+ dataRegion.deleteByDevice(
+ new PartialPath(deviceId, measurementId), 3L * i + 1, 3L * i + 1, -1, null);
+ }
+ }
+ }
+
+ static class DummyDataRegion extends DataRegion {
+
+ DummyDataRegion(String systemInfoDir, String storageGroupName) throws DataRegionException {
+ super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
+ }
+ }
+}
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index fecceb80a8..d26966668d 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -102,6 +102,10 @@ struct TFlushReq {
2: optional list<string> storageGroups
}
+struct TSettleReq {
+ 1: required list<string> paths
+}
+
// for node management
struct TSchemaNode {
1: required string nodeName
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 64c3e416c5..cf3fab6143 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -611,6 +611,8 @@ service IDataNodeRPCService {
common.TSStatus flush(common.TFlushReq req)
+ common.TSStatus settle(common.TSettleReq req)
+
common.TSStatus clearCache()
common.TSStatus loadConfiguration()