You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/14 09:18:20 UTC

[rocketmq] branch develop updated: [ISSUE #4037] Add DeleteExpiredCommitLogSubCommand (#4038)

This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new fcfe26e44 [ISSUE #4037] Add DeleteExpiredCommitLogSubCommand (#4038)
fcfe26e44 is described below

commit fcfe26e4443dd24b1055899266d1bd81060ee118
Author: cnScarb <jj...@163.com>
AuthorDate: Tue Jun 14 17:18:14 2022 +0800

    [ISSUE #4037] Add DeleteExpiredCommitLogSubCommand (#4038)
    
    * [ISSUE #4037] Merge with origin
    
    * [ISSUE #4048] Add assertions for DeleteExpiredCommitLogSubCommandTest
---
 .../broker/processor/AdminBrokerProcessor.java     | 12 +++
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 16 ++++
 .../rocketmq/common/protocol/RequestCode.java      |  2 +
 docs/cn/operation.md                               | 30 +++++++-
 docs/en/CLITools.md                                | 50 ++++++++----
 docs/en/operation.md                               | 20 +++++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    | 12 +++
 .../tools/admin/DefaultMQAdminExtImpl.java         | 36 +++++++++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  6 ++
 .../rocketmq/tools/command/MQAdminStartup.java     |  2 +
 .../broker/DeleteExpiredCommitLogSubCommand.java   | 88 ++++++++++++++++++++++
 .../DeleteExpiredCommitLogSubCommandTest.java      | 74 ++++++++++++++++++
 12 files changed, 332 insertions(+), 16 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 24f52f54a..dba97200c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,6 +212,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 return this.getSystemTopicListFromBroker(ctx, request);
             case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE:
                 return this.cleanExpiredConsumeQueue();
+            case RequestCode.DELETE_EXPIRED_COMMITLOG:
+                return this.deleteExpiredCommitLog();
             case RequestCode.CLEAN_UNUSED_TOPIC:
                 return this.cleanUnusedTopic();
             case RequestCode.GET_CONSUMER_RUNNING_INFO:
@@ -1187,6 +1189,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
+    public RemotingCommand deleteExpiredCommitLog() {
+        log.warn("invoke deleteExpiredCommitLog start.");
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        brokerController.getMessageStore().executeDeleteFilesManually();
+        log.warn("invoke deleteExpiredCommitLog end.");
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
     public RemotingCommand cleanUnusedTopic() {
         log.warn("invoke cleanUnusedTopic start.");
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 73cbb887f..245195f07 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1866,6 +1866,22 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
+    public boolean deleteExpiredCommitLog(final String addr, long timeoutMillis) throws MQClientException,
+        RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_EXPIRED_COMMITLOG, null);
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+            request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return true;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
     public boolean cleanUnusedTopicByAddr(final String addr,
         long timeoutMillis) throws MQClientException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 24c072bc8..9912382ff 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -192,4 +192,6 @@ public class RequestCode {
     public static final int ADD_WRITE_PERM_OF_BROKER = 327;
 
     public static final int GET_ALL_PRODUCER_INFO = 328;
+
+    public static final int DELETE_EXPIRED_COMMITLOG = 329;
 }
diff --git a/docs/cn/operation.md b/docs/cn/operation.md
index 692df2cca..737fd2527 100644
--- a/docs/cn/operation.md
+++ b/docs/cn/operation.md
@@ -566,6 +566,14 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
   <td class=xl67 width=87 style='width:65pt'>-b</td>
   <td class=xl68 width=87 style='width:65pt'>BrokerName</td>
  </tr>
+ <tr height=57 style='height:43.0pt'>
+  <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
+  <td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+  <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+  <td class=xl68 width=87 style='width:65pt'>打印帮助</td>
+ </tr>
  <tr height=57 style='height:43.0pt'>
    <td rowspan=3 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
    height:103.0pt;border-top:none;width:143pt'>addWritePerm</td>
@@ -573,7 +581,7 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
    border-top:none;width:65pt'>从NameServer上添加 Broker写权限</td>
    <td class=xl67 width=87 style='width:65pt'>-b</td>
    <td class=xl68 width=87 style='width:65pt'>BrokerName</td>
-  </tr>
+ </tr>
  <tr height=57 style='height:43.0pt'>
   <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
   <td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
@@ -602,6 +610,26 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
   <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
   <td class=xl68 width=87 style='width:65pt'>集群名称</td>
  </tr>
+ <tr height=57 style='height:43.0pt'>
+  <td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
+  height:120.0pt;border-top:none;width:143pt'>deleteExpiredCommitLog</td>
+  <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
+  border-top:none;width:65pt'>清理Broker上过期的CommitLog文件,Broker最多会执行20次删除操作,每次最多删除10个文件</td>
+  <td class=xl67 width=87 style='width:65pt'>-n</td>
+  <td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+  <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+  <td class=xl68 width=87 style='width:65pt'>打印帮助</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+  <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
+  <td class=xl68 width=87 style='width:65pt'>Broker 地址,地址为ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+  <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
+  <td class=xl68 width=87 style='width:65pt'>集群名称</td>
+ </tr>
  <tr height=88 style='mso-height-source:userset;height:66.0pt'>
   <td rowspan=4 height=191 class=xl69 width=191 style='border-bottom:1.0pt;
   height:143.0pt;border-top:none;width:143pt'>cleanUnusedTopic</td>
diff --git a/docs/en/CLITools.md b/docs/en/CLITools.md
index 41ad3aeac..208ae44f3 100644
--- a/docs/en/CLITools.md
+++ b/docs/en/CLITools.md
@@ -184,15 +184,15 @@ Before introducing the mqadmin management tool, the following points need to be
  </tr>
  <tr height=23 style='height:17.0pt'>
   <td height=23 class=xl65 width=149 style='height:17.0pt;width:112pt'>-t</td>
-  <td class=xl66 width=159 style='width:119pt'>topic,key</td>
+  <td class=xl66 width=159 style='width:119pt'>topic, key</td>
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl65 width=149 style='height:29.0pt;width:112pt'>-v</td>
-  <td class=xl66 width=159 style='width:119pt'>orderConf,value</td>
+  <td class=xl66 width=159 style='width:119pt'>orderConf, value</td>
  </tr>
  <tr height=57 style='height:43.0pt'>
   <td height=57 class=xl65 width=149 style='height:43.0pt;width:112pt'>-m</td>
-  <td class=xl66 width=159 style='width:119pt'>method,available values include get, put, delete</td>
+  <td class=xl66 width=159 style='width:119pt'>method, available values include get, put, delete</td>
  </tr>
  <tr height=23 style='height:17.0pt'>
   <td rowspan=4 height=198 class=xl68 width=163 style='border-bottom:1.0pt;
@@ -277,7 +277,7 @@ Before introducing the mqadmin management tool, the following points need to be
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl65 width=177 style='height:29.0pt;width:133pt'>-i</td>
-  <td class=xl66 width=185 style='width:139pt'>Print interval,unit basis is seconds</td>
+  <td class=xl66 width=185 style='width:139pt'>Print interval, unit basis is seconds</td>
  </tr>
  <tr height=95 style='height:71.0pt'>
   <td rowspan=8 height=391 class=xl67 width=177 style='border-bottom:1.0pt;
@@ -285,11 +285,11 @@ Before introducing the mqadmin management tool, the following points need to be
   <td rowspan=8 class=xl70 width=175 style='border-bottom:1.0pt;
   border-top:none;width:131pt'>Send message to detect each broker RT of the cluster.the message send to ${BrokerName} Topic</td>
   <td class=xl65 width=177 style='width:133pt'>-a</td>
-  <td class=xl66 width=185 style='width:139pt'>amount,total number per probe,RT = Total time/amount</td>
+  <td class=xl66 width=185 style='width:139pt'>amount, total number per probe, RT = Total time/amount</td>
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl65 width=177 style='height:29.0pt;width:133pt'>-s</td>
-  <td class=xl66 width=185 style='width:139pt'>Message size,unit basis is B</td>
+  <td class=xl66 width=185 style='width:139pt'>Message size, unit basis is B</td>
  </tr>
  <tr height=23 style='height:17.0pt'>
   <td height=23 class=xl65 width=177 style='height:17.0pt;width:133pt'>-c</td>
@@ -446,7 +446,27 @@ Before introducing the mqadmin management tool, the following points need to be
   <td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
   height:120.0pt;border-top:none;width:143pt'>cleanExpiredCQ</td>
   <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
-  border-top:none;width:65pt'>Clean up expired consume Queue on broker,An expired queue may be generated if the number of columns is reduced manually</td>
+  border-top:none;width:65pt'>Clean up expired consume Queue on broker, An expired queue may be generated if the number of columns is reduced manually</td>
+  <td class=xl67 width=87 style='width:65pt'>-n</td>
+  <td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+  <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+  <td class=xl68 width=87 style='width:65pt'>Print help information</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+  <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
+  <td class=xl68 width=87 style='width:65pt'>Declare the address of the broker and format as ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+  <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
+  <td class=xl68 width=87 style='width:65pt'>Used to specify the name of the cluster</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+  <td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
+  height:120.0pt;border-top:none;width:143pt'>deleteExpiredCommitLog</td>
+  <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
+  border-top:none;width:65pt'>Clean up expired CommitLog files on broker. A maximum of 20 deletion operations can be performed, and a maximum of 10 files can be deleted each time.</td>
   <td class=xl67 width=87 style='width:65pt'>-n</td>
   <td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td>
  </tr>
@@ -466,7 +486,7 @@ Before introducing the mqadmin management tool, the following points need to be
   <td rowspan=4 height=191 class=xl69 width=191 style='border-bottom:1.0pt;
   height:143.0pt;border-top:none;width:143pt'>cleanUnusedTopic</td>
   <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
-  border-top:none;width:65pt'>Clean up unused topic on broker and release topic's consume Queue from memory,If the topic is removed manually, an unused topic will be generated</td>
+  border-top:none;width:65pt'>Clean up unused topic on broker and release topic's consume Queue from memory, If the topic is removed manually, an unused topic will be generated</td>
   <td class=xl67 width=87 style='width:65pt'>-n</td>
   <td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td>
  </tr>
@@ -496,11 +516,11 @@ Before introducing the mqadmin management tool, the following points need to be
  </tr>
  <tr height=57 style='height:43.0pt'>
   <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
-  <td class=xl68 width=87 style='width:65pt'>brokerName,note that this is not broker's address</td>
+  <td class=xl68 width=87 style='width:65pt'>brokerName, note that this is not broker's address</td>
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
-  <td class=xl68 width=87 style='width:65pt'>Message size,the unit of account is B</td>
+  <td class=xl68 width=87 style='width:65pt'>Message size, the unit of account is B</td>
  </tr>
  <tr height=23 style='height:17.0pt'>
   <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
@@ -660,7 +680,7 @@ Before introducing the mqadmin management tool, the following points need to be
  </tr>
  <tr height=23 style='height:17.0pt'>
   <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-p</td>
-  <td class=xl68 width=87 style='width:65pt'>body,message body</td>
+  <td class=xl68 width=87 style='width:65pt'>body, message body</td>
  </tr>
  <tr height=23 style='height:17.0pt'>
   <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-k</td>
@@ -740,11 +760,11 @@ Before introducing the mqadmin management tool, the following points need to be
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-c</td>
-  <td class=xl68 width=87 style='width:65pt'>Character set,for example UTF-8</td>
+  <td class=xl68 width=87 style='width:65pt'>Character set, for example UTF-8</td>
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
-  <td class=xl68 width=87 style='width:65pt'>subExpress,filter expression</td>
+  <td class=xl68 width=87 style='width:65pt'>subExpress, filter expression</td>
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-b</td>
@@ -784,11 +804,11 @@ Before introducing the mqadmin management tool, the following points need to be
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-c</td>
-  <td class=xl68 width=87 style='width:65pt'>Character set,for example UTF-8</td>
+  <td class=xl68 width=87 style='width:65pt'>Character set, for example UTF-8</td>
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
-  <td class=xl68 width=87 style='width:65pt'>subExpress,filter expression</td>
+  <td class=xl68 width=87 style='width:65pt'>subExpress, filter expression</td>
  </tr>
  <tr height=39 style='height:29.0pt'>
   <td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-b</td>
diff --git a/docs/en/operation.md b/docs/en/operation.md
index 016e718b9..9dff41ab7 100644
--- a/docs/en/operation.md
+++ b/docs/en/operation.md
@@ -583,6 +583,26 @@ The above Broker matches Slave by specifying the same BrokerName, Master's Broke
   <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
   <td class=xl68 width=87 style='width:65pt'>cluster name</td>
  </tr>
+ <tr height=57 style='height:43.0pt'>
+  <td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
+  height:120.0pt;border-top:none;width:143pt'>deleteExpiredCommitLog</td>
+  <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
+  border-top:none;width:65pt'>delete Broker's expired CommitLog files.</td>
+  <td class=xl67 width=87 style='width:65pt'>-n</td>
+  <td class=xl68 width=87 style='width:65pt'>NameServer Service address, format is ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+  <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+  <td class=xl68 width=87 style='width:65pt'>print help info</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+  <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
+  <td class=xl68 width=87 style='width:65pt'>Broker address, fomat isip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+  <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
+  <td class=xl68 width=87 style='width:65pt'>cluster name</td>
+ </tr>
  <tr height=88 style='mso-height-source:userset;height:66.0pt'>
   <td rowspan=4 height=191 class=xl69 width=191 style='border-bottom:1.0pt;
   height:143.0pt;border-top:none;width:143pt'>cleanUnusedTopic</td>
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 44f64dbbe..2206bc175 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -438,6 +438,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr);
     }
 
+    @Override
+    public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.deleteExpiredCommitLog(cluster);
+    }
+
+    @Override
+    public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.deleteExpiredCommitLogByAddr(addr);
+    }
+
     @Override
     public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2d3bcd698..9e99925a8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -754,6 +754,42 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
+    @Override
+    public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        boolean result = false;
+        try {
+            ClusterInfo clusterInfo = examineBrokerClusterInfo();
+            if (null == cluster || "".equals(cluster)) {
+                for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
+                    result = deleteExpiredCommitLogByCluster(clusterInfo, targetCluster);
+                }
+            } else {
+                result = deleteExpiredCommitLogByCluster(clusterInfo, cluster);
+            }
+        } catch (MQBrokerException e) {
+            log.error("deleteExpiredCommitLog error.", e);
+        }
+
+        return result;
+    }
+
+    public boolean deleteExpiredCommitLogByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException,
+        RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        boolean result = false;
+        String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
+        for (String addr : addrs) {
+            result = deleteExpiredCommitLogByAddr(addr);
+        }
+        return result;
+    }
+
+    @Override
+    public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        boolean result = mqClientInstance.getMQClientAPIImpl().deleteExpiredCommitLog(addr, timeoutMillis);
+        log.warn("Delete expired CommitLog on target " + addr + " broker " + result);
+        return result;
+    }
+
     @Override
     public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index d746433ce..58e3fb713 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -205,6 +205,12 @@ public interface MQAdminExt extends MQAdmin {
     boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException;
 
+    boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, MQClientException, InterruptedException;
+
+    boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, MQClientException, InterruptedException;
+
     boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException;
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index fca04e175..c69b7bf9e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
 import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
+import org.apache.rocketmq.tools.command.broker.DeleteExpiredCommitLogSubCommand;
 import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
 import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand;
 import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand;
@@ -201,6 +202,7 @@ public class MQAdminStartup {
 
         initCommand(new UpdateOrderConfCommand());
         initCommand(new CleanExpiredCQSubCommand());
+        initCommand(new DeleteExpiredCommitLogSubCommand());
         initCommand(new CleanUnusedTopicCommand());
 
         initCommand(new StartMonitoringSubCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java
new file mode 100644
index 000000000..a4b2a51ad
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.broker;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+/**
+ * MQAdmin command which deletes expired CommitLog files
+ */
+public class DeleteExpiredCommitLogSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "deleteExpiredCommitLog";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Delete expired CommitLog files";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("n", "namesrvAddr", true, "Name server address");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerAddr", true, "Broker address");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "clustername");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        if (commandLine.hasOption('n')) {
+            defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim());
+        }
+
+        try {
+            boolean result = false;
+            defaultMQAdminExt.start();
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+                result = defaultMQAdminExt.deleteExpiredCommitLogByAddr(addr);
+
+            } else {
+                String cluster = commandLine.getOptionValue('c');
+                if (null != cluster)
+                    cluster = cluster.trim();
+                result = defaultMQAdminExt.deleteExpiredCommitLog(cluster);
+            }
+            System.out.printf(result ? "success" : "false");
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command execute failed.", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommandTest.java
new file mode 100644
index 000000000..61b3acaa5
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommandTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeleteExpiredCommitLogSubCommandTest extends ServerResponseMocker {
+
+    private static final int PORT = 45678;
+
+    private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+    private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+    private final PrintStream originalOut = System.out;
+    private final PrintStream originalErr = System.err;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setOut(new PrintStream(outContent));
+        System.setErr(new PrintStream(errContent));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.setOut(originalOut);
+        System.setErr(originalErr);
+    }
+
+    @Override
+    protected int getPort() {
+        return PORT;
+    }
+
+    @Override
+    protected byte[] getBody() {
+        return null;
+    }
+
+    @Test
+    public void testExecute() throws SubCommandException {
+        DeleteExpiredCommitLogSubCommand cmd = new DeleteExpiredCommitLogSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-b 127.0.0.1:" + PORT, "-c default-cluster"};
+        final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
+            cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+        Assert.assertTrue(outContent.toString().startsWith("success"));
+        Assert.assertEquals("", errContent.toString());
+    }
+}