You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/12 08:55:02 UTC

[iotdb] branch master updated: [IOTDB-4674] Reimplement settle by compaction (#8644)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5d78285d [IOTDB-4674] Reimplement settle by compaction (#8644)
9d5d78285d is described below

commit 9d5d78285d3178200cc916c7d1eca22f8b4fc08d
Author: shuwenwei <55...@users.noreply.github.com>
AuthorDate: Thu Jan 12 16:54:54 2023 +0800

    [IOTDB-4674] Reimplement settle by compaction (#8644)
---
 .../resources/tools/tsfile/settle-tsfile.bat       |  62 ++++
 .../resources/tools/tsfile/settle-tsfile.sh        |  48 +++
 .../db/engine/settle/SettleRequestHandler.java     | 323 +++++++++++++++++++++
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 +
 .../tools/settle/TsFileSettleByCompactionTool.java | 123 ++++++++
 .../db/engine/settle/SettleRequestHandlerTest.java | 162 +++++++++++
 thrift-commons/src/main/thrift/common.thrift       |   4 +
 thrift/src/main/thrift/datanode.thrift             |   2 +
 8 files changed, 731 insertions(+)

diff --git a/server/src/assembly/resources/tools/tsfile/settle-tsfile.bat b/server/src/assembly/resources/tools/tsfile/settle-tsfile.bat
new file mode 100644
index 0000000000..1e4e9f744c
--- /dev/null
+++ b/server/src/assembly/resources/tools/tsfile/settle-tsfile.bat
@@ -0,0 +1,62 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM     http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM
+
+
+@echo off
+echo ````````````````````````````````````````````
+echo Starting Settling the TsFile By Compaction
+echo ````````````````````````````````````````````
+
+if "%OS%" == "Windows_NT" setlocal
+
+pushd %~dp0..\..
+if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
+popd
+
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.settle.TsFileSettleByCompactionTool
+if NOT DEFINED JAVA_HOME goto :err
+
+@REM -----------------------------------------------------------------------------
+@REM ***** CLASSPATH library setting *****
+@REM Ensure that any user defined CLASSPATH variables are not used on startup
+set CLASSPATH="%IOTDB_HOME%\lib\*"
+
+goto okClasspath
+
+:append
+set CLASSPATH=%CLASSPATH%;%1
+goto :eof
+
+@REM -----------------------------------------------------------------------------
+:okClasspath
+
+"%JAVA_HOME%\bin\java" -cp "%CLASSPATH%" %MAIN_CLASS% %*
+
+goto finally
+
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+
+@REM -----------------------------------------------------------------------------
+:finally
+
+ENDLOCAL
diff --git a/server/src/assembly/resources/tools/tsfile/settle-tsfile.sh b/server/src/assembly/resources/tools/tsfile/settle-tsfile.sh
new file mode 100644
index 0000000000..384a82cf94
--- /dev/null
+++ b/server/src/assembly/resources/tools/tsfile/settle-tsfile.sh
@@ -0,0 +1,48 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+echo -----------------------------------------------
+echo Starting Settling the TsFile By Compaction
+echo -----------------------------------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+    for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+        if [ -x "$java" ]; then
+            JAVA="$java"
+            break
+        fi
+    done
+else
+    JAVA=java
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+
+MAIN_CLASS=org.apache.iotdb.db.tools.settle.TsFileSettleByCompactionTool
+
+"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
+exit $?
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleRequestHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleRequestHandler.java
new file mode 100644
index 0000000000..7b32559532
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleRequestHandler.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.settle;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SettleRequestHandler {
+  private static final Logger logger = LoggerFactory.getLogger(SettleRequestHandler.class);
+
+  private boolean testMode = false;
+
+  public boolean isTestMode() {
+    return testMode;
+  }
+
+  public void setTestMode(boolean testMode) {
+    this.testMode = testMode;
+  }
+
+  public static SettleRequestHandler getInstance() {
+    return SettleRequestHandlerHolder.INSTANCE;
+  }
+
+  public TSStatus handleSettleRequest(TSettleReq req) {
+    List<String> paths = req.getPaths();
+
+    SettleRequestContext context = new SettleRequestContext(paths);
+    TSStatus validationResult = context.validate();
+    if (validationResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return validationResult;
+    }
+    if (testMode) {
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+    List<TsFileResource> selectedTsFileResources = context.getTsFileResourcesByFileNames();
+    return context.submitCompactionTask(selectedTsFileResources);
+  }
+
+  private static class SettleRequestContext {
+    private ConsistentSettleInfo targetConsistentSettleInfo;
+
+    private boolean hasSeqFiles;
+    private boolean hasUnseqFiles;
+    private boolean hasModsFiles;
+    private List<String> paths;
+    private Set<String> tsFileNames;
+    private TsFileResourceList allTsFileResourceList;
+
+    private TsFileManager tsFileManager;
+    private IoTDBConfig config;
+
+    private SettleRequestContext(List<String> paths) {
+      this.paths = paths;
+      this.tsFileNames = new HashSet<>();
+      this.config = IoTDBDescriptor.getInstance().getConfig();
+    }
+
+    private TSStatus validate() {
+      if (paths == null || paths.size() == 0) {
+        return RpcUtils.getStatus(
+            TSStatusCode.ILLEGAL_PARAMETER, "The files to settle is not offered.");
+      }
+
+      int maxInnerCompactionCandidateFileNum = config.getMaxInnerCompactionCandidateFileNum();
+      if (paths.size() > maxInnerCompactionCandidateFileNum) {
+        return RpcUtils.getStatus(
+            TSStatusCode.UNSUPPORTED_OPERATION,
+            "Too many files offered, the limited count of system config is "
+                + maxInnerCompactionCandidateFileNum
+                + ", the input file count is "
+                + tsFileNames.size());
+      }
+
+      TSStatus validationResult;
+
+      for (String path : paths) {
+        File currentTsFile = new File(path);
+        if (!currentTsFile.exists()) {
+          return RpcUtils.getStatus(
+              TSStatusCode.PATH_NOT_EXIST, "The specified file does not exist in " + path);
+        }
+        File modsFile = new File(path + ModificationFile.FILE_SUFFIX);
+        hasModsFiles |= modsFile.exists();
+
+        ConsistentSettleInfo currentInfo = calculateConsistentInfo(currentTsFile);
+        if (!currentInfo.isValid) {
+          return RpcUtils.getStatus(
+              TSStatusCode.ILLEGAL_PATH, "The File Name of the TsFile is not valid: " + path);
+        }
+        if (this.targetConsistentSettleInfo == null) {
+          this.targetConsistentSettleInfo = currentInfo;
+        }
+
+        validationResult = targetConsistentSettleInfo.checkConsistency(currentInfo);
+        if (!isSuccess(validationResult)) {
+          return validationResult;
+        }
+
+        if (TsFileUtils.isSequence(currentTsFile)) {
+          hasSeqFiles = true;
+        } else {
+          hasUnseqFiles = true;
+        }
+        tsFileNames.add(currentTsFile.getName());
+        if (hasSeqFiles && hasUnseqFiles) {
+          return RpcUtils.getStatus(
+              TSStatusCode.UNSUPPORTED_OPERATION, "Settle by cross compaction is not allowed.");
+        }
+      }
+
+      if (!hasModsFiles) {
+        return RpcUtils.getStatus(
+            TSStatusCode.ILLEGAL_PARAMETER,
+            "Every selected TsFile does not contains the mods file.");
+      }
+      DataRegion dataRegion =
+          StorageEngine.getInstance()
+              .getDataRegion(new DataRegionId(targetConsistentSettleInfo.dataRegionId));
+      if (dataRegion == null) {
+        return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PATH, "DataRegion not exist");
+      }
+      tsFileManager = dataRegion.getTsFileManager();
+
+      validationResult = checkCompactionConfigs();
+      if (!isSuccess(validationResult)) {
+        return validationResult;
+      }
+
+      if (hasSeqFiles) {
+        allTsFileResourceList =
+            tsFileManager.getSequenceListByTimePartition(
+                targetConsistentSettleInfo.timePartitionId);
+      } else {
+        allTsFileResourceList =
+            tsFileManager.getUnsequenceListByTimePartition(
+                targetConsistentSettleInfo.timePartitionId);
+      }
+
+      return validateTsFileResources();
+    }
+
+    private ConsistentSettleInfo calculateConsistentInfo(File tsFile) {
+      ConsistentSettleInfo values = new ConsistentSettleInfo();
+      values.dataRegionId = TsFileUtils.getDataRegionId(tsFile);
+      values.storageGroupName = TsFileUtils.getStorageGroup(tsFile);
+      values.timePartitionId = TsFileUtils.getTimePartition(tsFile);
+      values.isValid = true;
+
+      String fileNameStr = tsFile.getName();
+      TsFileNameGenerator.TsFileName tsFileName;
+      try {
+        tsFileName = TsFileNameGenerator.getTsFileName(fileNameStr);
+      } catch (IOException e) {
+        values.isValid = false;
+        return values;
+      }
+      values.level = tsFileName.getInnerCompactionCnt();
+      return values;
+    }
+
+    private boolean isSuccess(TSStatus status) {
+      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+    }
+
+    private TSStatus checkCompactionConfigs() {
+      if (!tsFileManager.isAllowCompaction()) {
+        return RpcUtils.getStatus(
+            TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in this DataRegion is not allowed.");
+      }
+      if (hasSeqFiles && !config.isEnableSeqSpaceCompaction()) {
+        return RpcUtils.getStatus(
+            TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in Seq Space is not enabled");
+      }
+      if (hasUnseqFiles && !config.isEnableUnseqSpaceCompaction()) {
+        return RpcUtils.getStatus(
+            TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in Unseq Space is not enabled");
+      }
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+
+    private TSStatus validateTsFileResources() {
+      int continuousCount = 0;
+      for (TsFileResource tsFileResource : allTsFileResourceList) {
+        File tsFile = tsFileResource.getTsFile();
+        if (tsFileNames.contains(tsFile.getName())) {
+          if (tsFileResource.getStatus() != TsFileResourceStatus.CLOSED) {
+            return RpcUtils.getStatus(
+                TSStatusCode.ILLEGAL_PARAMETER,
+                "The TsFile is not valid: " + tsFile.getAbsolutePath());
+          }
+          continuousCount++;
+          continue;
+        }
+        if (continuousCount != 0) {
+          break;
+        }
+      }
+      if (continuousCount != tsFileNames.size()) {
+        return RpcUtils.getStatus(
+            TSStatusCode.ILLEGAL_PARAMETER, "Selected TsFiles are not continuous.");
+      }
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+
+    private List<TsFileResource> getTsFileResourcesByFileNames() {
+      List<TsFileResource> selectedTsFileResources = new ArrayList<>(tsFileNames.size());
+      for (TsFileResource tsFileResource : allTsFileResourceList) {
+        if (tsFileNames.contains(tsFileResource.getTsFile().getName())) {
+          selectedTsFileResources.add(tsFileResource);
+        }
+        if (selectedTsFileResources.size() == tsFileNames.size()) {
+          break;
+        }
+      }
+      return selectedTsFileResources;
+    }
+
+    private TSStatus submitCompactionTask(List<TsFileResource> tsFileResources) {
+      ICompactionPerformer performer =
+          hasSeqFiles
+              ? config.getInnerSeqCompactionPerformer().createInstance()
+              : config.getInnerUnseqCompactionPerformer().createInstance();
+      AbstractCompactionTask task =
+          new InnerSpaceCompactionTask(
+              targetConsistentSettleInfo.timePartitionId,
+              tsFileManager,
+              tsFileResources,
+              hasSeqFiles,
+              performer,
+              CompactionTaskManager.currentTaskNum,
+              tsFileManager.getNextCompactionTaskId());
+      try {
+        CompactionTaskManager.getInstance().addTaskToWaitingQueue(task);
+      } catch (InterruptedException e) {
+        logger.error(
+            "meet error when adding task-{} to compaction waiting queue: {}",
+            task.getSerialId(),
+            e.getMessage());
+        return RpcUtils.getStatus(
+            TSStatusCode.COMPACTION_ERROR, "meet error when submit settle task.");
+      }
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+  }
+
+  private static class ConsistentSettleInfo {
+    private int dataRegionId;
+    private int level;
+    private String storageGroupName;
+    private long timePartitionId;
+    private boolean isValid;
+
+    private TSStatus checkConsistency(ConsistentSettleInfo other) {
+      if (this.dataRegionId != other.dataRegionId) {
+        return RpcUtils.getStatus(
+            TSStatusCode.ILLEGAL_PATH, "DataRegion of files is not consistent.");
+      }
+      if (!this.storageGroupName.equals(other.storageGroupName)) {
+        return RpcUtils.getStatus(
+            TSStatusCode.ILLEGAL_PATH, "StorageGroup of files is not consistent.");
+      }
+      if (this.timePartitionId != other.timePartitionId) {
+        return RpcUtils.getStatus(
+            TSStatusCode.ILLEGAL_PATH, "TimePartition of files is not consistent.");
+      }
+      if (this.level != other.level) {
+        return RpcUtils.getStatus(
+            TSStatusCode.ILLEGAL_PARAMETER, "Level of files is not consistent.");
+      }
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+  }
+
+  private static class SettleRequestHandlerHolder {
+    private static final SettleRequestHandler INSTANCE = new SettleRequestHandler();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 956b4be9f2..a44774faa4 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -59,6 +60,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.BloomFilterCache;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.settle.SettleRequestHandler;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
@@ -1112,6 +1114,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  @Override
+  public TSStatus settle(TSettleReq req) throws TException {
+    return SettleRequestHandler.getInstance().handleSettleRequest(req);
+  }
+
   @Override
   public TSStatus loadConfiguration() throws TException {
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileSettleByCompactionTool.java b/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileSettleByCompactionTool.java
new file mode 100644
index 0000000000..51eccb2ed2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileSettleByCompactionTool.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.settle;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.layered.TFramedTransport;
+
+import java.util.Arrays;
+
+public class TsFileSettleByCompactionTool {
+
+  private static final String HOST_ARGS = "h";
+  private static final String HOST_NAME = "host";
+
+  private static final String PORT_ARGS = "p";
+  private static final String PORT_NAME = "port";
+
+  private static final String FILE_PATH_ARGS = "f";
+  private static final String FILE_PATH_NAME = "file paths";
+
+  private static final String DEFAULT_HOST_VALUE = "127.0.0.1";
+  private static final String DEFAULT_PORT_VALUE = "9003";
+
+  public static void main(String[] args) throws TException {
+    String[] filePaths;
+
+    Options commandLineOptions = createOptions();
+    CommandLineParser parser = new DefaultParser();
+    CommandLine commandLine;
+    try {
+      commandLine = parser.parse(commandLineOptions, args);
+    } catch (ParseException e) {
+      System.out.println("Parse command line args failed: " + e.getMessage());
+      return;
+    }
+
+    String hostValue = getArgOrDefault(commandLine, HOST_ARGS, DEFAULT_HOST_VALUE);
+    String portValue = getArgOrDefault(commandLine, PORT_ARGS, DEFAULT_PORT_VALUE);
+    int port = Integer.parseInt(portValue);
+    filePaths = commandLine.getOptionValues(FILE_PATH_ARGS);
+
+    TTransport transport = new TFramedTransport(new TSocket(hostValue, port));
+    transport.open();
+    TProtocol protocol = new TBinaryProtocol(transport);
+    IDataNodeRPCService.Client.Factory clientFactory = new IDataNodeRPCService.Client.Factory();
+    IDataNodeRPCService.Client client = clientFactory.getClient(protocol);
+
+    TSettleReq tSettleReq = new TSettleReq();
+    tSettleReq.setPaths(Arrays.asList(filePaths));
+    TSStatus result = client.settle(tSettleReq);
+    if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      System.out.println("Add Settle Compaction Task Successfully");
+    } else {
+      System.out.println("Add settle compaction task failed with status code: " + result);
+    }
+  }
+
+  private static Options createOptions() {
+    Options options = new Options();
+    Option host =
+        Option.builder(HOST_ARGS)
+            .argName(HOST_NAME)
+            .hasArg()
+            .desc("Host Name (optional, default 127.0.0.1")
+            .build();
+    options.addOption(host);
+
+    Option port =
+        Option.builder(PORT_ARGS)
+            .argName(PORT_NAME)
+            .hasArg()
+            .desc("Port (optional, default 9003)")
+            .build();
+    options.addOption(port);
+
+    Option filePaths =
+        Option.builder(FILE_PATH_ARGS)
+            .argName(FILE_PATH_NAME)
+            .hasArgs()
+            .valueSeparator(' ')
+            .desc("File Paths (required)")
+            .required()
+            .build();
+    options.addOption(filePaths);
+    return options;
+  }
+
+  private static String getArgOrDefault(CommandLine commandLine, String arg, String defaultValue) {
+    String value = commandLine.getOptionValue(arg);
+    return value == null ? defaultValue : value;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/settle/SettleRequestHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/settle/SettleRequestHandlerTest.java
new file mode 100644
index 0000000000..f849ac9f63
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/settle/SettleRequestHandlerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.settle;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.DataRegionTest;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.WALManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SettleRequestHandlerTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private SettleRequestHandler reqHandler;
+  private String storageGroup = "root.sg.d1";
+  private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+  private String deviceId = "root.sg.d1";
+  private String measurementId = "s0";
+  private List<String> paths = new ArrayList<>();
+  private DataRegion dataRegion;
+
+  @Before
+  public void setUp()
+      throws DataRegionException, StartupException, IOException, StorageEngineException {
+    EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.envSetUp();
+    reqHandler = SettleRequestHandler.getInstance();
+    reqHandler.setTestMode(true);
+    dataRegion = new DummyDataRegion(systemDir, storageGroup);
+    StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion);
+    WALManager.getInstance().start();
+    FlushManager.getInstance().start();
+  }
+
+  @After
+  public void tearDown() throws StorageEngineException, IOException {
+    WALManager.getInstance().stop();
+    FlushManager.getInstance().stop();
+    if (dataRegion != null) {
+      dataRegion.syncDeleteDataFiles();
+      StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
+    }
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testHandleSettleRequest()
+      throws IllegalPathException, IOException, WriteProcessException {
+    createTsFiles();
+    Assert.assertEquals(3, paths.size());
+
+    // 3 TsFile compaction
+    TSettleReq req = new TSettleReq();
+    req.setPaths(paths);
+    TSStatus result = reqHandler.handleSettleRequest(req);
+    Assert.assertEquals(result.code, TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+    // not enable compaction
+    config.setEnableSeqSpaceCompaction(false);
+    result = reqHandler.handleSettleRequest(req);
+    Assert.assertEquals(result.code, TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode());
+    config.setEnableSeqSpaceCompaction(true);
+
+    // compaction candidate file num
+    int maxInnerCompactionCandidateFileNum = config.getMaxInnerCompactionCandidateFileNum();
+    config.setMaxInnerCompactionCandidateFileNum(2);
+    result = reqHandler.handleSettleRequest(req);
+    Assert.assertEquals(result.code, TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode());
+    String firstTsFilePath = paths.remove(0);
+    result = reqHandler.handleSettleRequest(req);
+    Assert.assertEquals(result.code, TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    paths.add(0, firstTsFilePath);
+    config.setMaxInnerCompactionCandidateFileNum(maxInnerCompactionCandidateFileNum);
+
+    // not continuous
+    paths.remove(1);
+    result = reqHandler.handleSettleRequest(req);
+    Assert.assertEquals(result.code, TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
+
+    // mods file not exist
+    paths.remove(0);
+    result = reqHandler.handleSettleRequest(req);
+    Assert.assertEquals(result.code, TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
+  }
+
+  private void createTsFiles() throws IllegalPathException, WriteProcessException, IOException {
+    TSRecord record;
+    for (int i = 0; i < 3; i++) {
+      for (int j = 1; j <= 3; j++) {
+        long timestamp = 3L * i + j;
+        record = new TSRecord(timestamp, deviceId);
+        record.addTuple(
+            DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(timestamp)));
+        dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+      }
+      for (TsFileProcessor tsFileProcessor : dataRegion.getWorkSequenceTsFileProcessors()) {
+        paths.add(tsFileProcessor.getTsFileResource().getTsFilePath());
+        tsFileProcessor.syncFlush();
+      }
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+      if (i != 2) {
+        dataRegion.deleteByDevice(
+            new PartialPath(deviceId, measurementId), 3L * i + 1, 3L * i + 1, -1, null);
+      }
+    }
+  }
+
+  static class DummyDataRegion extends DataRegion {
+
+    DummyDataRegion(String systemInfoDir, String storageGroupName) throws DataRegionException {
+      super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
+    }
+  }
+}
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index fecceb80a8..d26966668d 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -102,6 +102,10 @@ struct TFlushReq {
    2: optional list<string> storageGroups
 }
 
+struct TSettleReq {
+   1: required list<string> paths
+}
+
 // for node management
 struct TSchemaNode {
   1: required string nodeName
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 64c3e416c5..cf3fab6143 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -611,6 +611,8 @@ service IDataNodeRPCService {
 
   common.TSStatus flush(common.TFlushReq req)
 
+  common.TSStatus settle(common.TSettleReq req)
+
   common.TSStatus clearCache()
 
   common.TSStatus loadConfiguration()