You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/14 09:18:20 UTC
[rocketmq] branch develop updated: [ISSUE #4037] Add DeleteExpiredCommitLogSubCommand (#4038)
This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fcfe26e44 [ISSUE #4037] Add DeleteExpiredCommitLogSubCommand (#4038)
fcfe26e44 is described below
commit fcfe26e4443dd24b1055899266d1bd81060ee118
Author: cnScarb <jj...@163.com>
AuthorDate: Tue Jun 14 17:18:14 2022 +0800
[ISSUE #4037] Add DeleteExpiredCommitLogSubCommand (#4038)
* [ISSUE #4037] Merge with origin
* [ISSUE #4048] Add assertions for DeleteExpiredCommitLogSubCommandTest
---
.../broker/processor/AdminBrokerProcessor.java | 12 +++
.../rocketmq/client/impl/MQClientAPIImpl.java | 16 ++++
.../rocketmq/common/protocol/RequestCode.java | 2 +
docs/cn/operation.md | 30 +++++++-
docs/en/CLITools.md | 50 ++++++++----
docs/en/operation.md | 20 +++++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 12 +++
.../tools/admin/DefaultMQAdminExtImpl.java | 36 +++++++++
.../apache/rocketmq/tools/admin/MQAdminExt.java | 6 ++
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../broker/DeleteExpiredCommitLogSubCommand.java | 88 ++++++++++++++++++++++
.../DeleteExpiredCommitLogSubCommandTest.java | 74 ++++++++++++++++++
12 files changed, 332 insertions(+), 16 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 24f52f54a..dba97200c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,6 +212,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return this.getSystemTopicListFromBroker(ctx, request);
case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE:
return this.cleanExpiredConsumeQueue();
+ case RequestCode.DELETE_EXPIRED_COMMITLOG:
+ return this.deleteExpiredCommitLog();
case RequestCode.CLEAN_UNUSED_TOPIC:
return this.cleanUnusedTopic();
case RequestCode.GET_CONSUMER_RUNNING_INFO:
@@ -1187,6 +1189,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
+ public RemotingCommand deleteExpiredCommitLog() {
+ log.warn("invoke deleteExpiredCommitLog start.");
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ brokerController.getMessageStore().executeDeleteFilesManually();
+ log.warn("invoke deleteExpiredCommitLog end.");
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
public RemotingCommand cleanUnusedTopic() {
log.warn("invoke cleanUnusedTopic start.");
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 73cbb887f..245195f07 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1866,6 +1866,22 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
+ public boolean deleteExpiredCommitLog(final String addr, long timeoutMillis) throws MQClientException,
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_EXPIRED_COMMITLOG, null);
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return true;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
public boolean cleanUnusedTopicByAddr(final String addr,
long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 24c072bc8..9912382ff 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -192,4 +192,6 @@ public class RequestCode {
public static final int ADD_WRITE_PERM_OF_BROKER = 327;
public static final int GET_ALL_PRODUCER_INFO = 328;
+
+ public static final int DELETE_EXPIRED_COMMITLOG = 329;
}
diff --git a/docs/cn/operation.md b/docs/cn/operation.md
index 692df2cca..737fd2527 100644
--- a/docs/cn/operation.md
+++ b/docs/cn/operation.md
@@ -566,6 +566,14 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
<td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>BrokerName</td>
</tr>
+ <tr height=57 style='height:43.0pt'>
+ <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
+ <td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+ <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+ <td class=xl68 width=87 style='width:65pt'>打印帮助</td>
+ </tr>
<tr height=57 style='height:43.0pt'>
<td rowspan=3 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
height:103.0pt;border-top:none;width:143pt'>addWritePerm</td>
@@ -573,7 +581,7 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
border-top:none;width:65pt'>从NameServer上添加 Broker写权限</td>
<td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>BrokerName</td>
- </tr>
+ </tr>
<tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
<td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
@@ -602,6 +610,26 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
<td class=xl68 width=87 style='width:65pt'>集群名称</td>
</tr>
+ <tr height=57 style='height:43.0pt'>
+ <td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
+ height:120.0pt;border-top:none;width:143pt'>deleteExpiredCommitLog</td>
+ <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
+ border-top:none;width:65pt'>清理Broker上过期的CommitLog文件,Broker最多会执行20次删除操作,每次最多删除10个文件</td>
+ <td class=xl67 width=87 style='width:65pt'>-n</td>
+ <td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+ <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+ <td class=xl68 width=87 style='width:65pt'>打印帮助</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+ <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
+ <td class=xl68 width=87 style='width:65pt'>Broker 地址,地址为ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+ <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
+ <td class=xl68 width=87 style='width:65pt'>集群名称</td>
+ </tr>
<tr height=88 style='mso-height-source:userset;height:66.0pt'>
<td rowspan=4 height=191 class=xl69 width=191 style='border-bottom:1.0pt;
height:143.0pt;border-top:none;width:143pt'>cleanUnusedTopic</td>
diff --git a/docs/en/CLITools.md b/docs/en/CLITools.md
index 41ad3aeac..208ae44f3 100644
--- a/docs/en/CLITools.md
+++ b/docs/en/CLITools.md
@@ -184,15 +184,15 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl65 width=149 style='height:17.0pt;width:112pt'>-t</td>
- <td class=xl66 width=159 style='width:119pt'>topic,key</td>
+ <td class=xl66 width=159 style='width:119pt'>topic, key</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl65 width=149 style='height:29.0pt;width:112pt'>-v</td>
- <td class=xl66 width=159 style='width:119pt'>orderConf,value</td>
+ <td class=xl66 width=159 style='width:119pt'>orderConf, value</td>
</tr>
<tr height=57 style='height:43.0pt'>
<td height=57 class=xl65 width=149 style='height:43.0pt;width:112pt'>-m</td>
- <td class=xl66 width=159 style='width:119pt'>method,available values include get, put, delete</td>
+ <td class=xl66 width=159 style='width:119pt'>method, available values include get, put, delete</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td rowspan=4 height=198 class=xl68 width=163 style='border-bottom:1.0pt;
@@ -277,7 +277,7 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl65 width=177 style='height:29.0pt;width:133pt'>-i</td>
- <td class=xl66 width=185 style='width:139pt'>Print interval,unit basis is seconds</td>
+ <td class=xl66 width=185 style='width:139pt'>Print interval, unit basis is seconds</td>
</tr>
<tr height=95 style='height:71.0pt'>
<td rowspan=8 height=391 class=xl67 width=177 style='border-bottom:1.0pt;
@@ -285,11 +285,11 @@ Before introducing the mqadmin management tool, the following points need to be
<td rowspan=8 class=xl70 width=175 style='border-bottom:1.0pt;
border-top:none;width:131pt'>Send message to detect each broker RT of the cluster.the message send to ${BrokerName} Topic</td>
<td class=xl65 width=177 style='width:133pt'>-a</td>
- <td class=xl66 width=185 style='width:139pt'>amount,total number per probe,RT = Total time/amount</td>
+ <td class=xl66 width=185 style='width:139pt'>amount, total number per probe, RT = Total time/amount</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl65 width=177 style='height:29.0pt;width:133pt'>-s</td>
- <td class=xl66 width=185 style='width:139pt'>Message size,unit basis is B</td>
+ <td class=xl66 width=185 style='width:139pt'>Message size, unit basis is B</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl65 width=177 style='height:17.0pt;width:133pt'>-c</td>
@@ -446,7 +446,27 @@ Before introducing the mqadmin management tool, the following points need to be
<td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
height:120.0pt;border-top:none;width:143pt'>cleanExpiredCQ</td>
<td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
- border-top:none;width:65pt'>Clean up expired consume Queue on broker,An expired queue may be generated if the number of columns is reduced manually</td>
+ border-top:none;width:65pt'>Clean up expired consume Queue on broker, An expired queue may be generated if the number of columns is reduced manually</td>
+ <td class=xl67 width=87 style='width:65pt'>-n</td>
+ <td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+ <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+ <td class=xl68 width=87 style='width:65pt'>Print help information</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+ <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
+ <td class=xl68 width=87 style='width:65pt'>Declare the address of the broker and format as ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+ <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
+ <td class=xl68 width=87 style='width:65pt'>Used to specify the name of the cluster</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+ <td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
+ height:120.0pt;border-top:none;width:143pt'>deleteExpiredCommitLog</td>
+ <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
+ border-top:none;width:65pt'>Clean up expired CommitLog files on broker. A maximum of 20 deletion operations can be performed, and a maximum of 10 files can be deleted each time.</td>
<td class=xl67 width=87 style='width:65pt'>-n</td>
<td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td>
</tr>
@@ -466,7 +486,7 @@ Before introducing the mqadmin management tool, the following points need to be
<td rowspan=4 height=191 class=xl69 width=191 style='border-bottom:1.0pt;
height:143.0pt;border-top:none;width:143pt'>cleanUnusedTopic</td>
<td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
- border-top:none;width:65pt'>Clean up unused topic on broker and release topic's consume Queue from memory,If the topic is removed manually, an unused topic will be generated</td>
+ border-top:none;width:65pt'>Clean up unused topic on broker and release topic's consume Queue from memory, If the topic is removed manually, an unused topic will be generated</td>
<td class=xl67 width=87 style='width:65pt'>-n</td>
<td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td>
</tr>
@@ -496,11 +516,11 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
- <td class=xl68 width=87 style='width:65pt'>brokerName,note that this is not broker's address</td>
+ <td class=xl68 width=87 style='width:65pt'>brokerName, note that this is not broker's address</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
- <td class=xl68 width=87 style='width:65pt'>Message size,the unit of account is B</td>
+ <td class=xl68 width=87 style='width:65pt'>Message size, the unit of account is B</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
@@ -660,7 +680,7 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-p</td>
- <td class=xl68 width=87 style='width:65pt'>body,message body</td>
+ <td class=xl68 width=87 style='width:65pt'>body, message body</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-k</td>
@@ -740,11 +760,11 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-c</td>
- <td class=xl68 width=87 style='width:65pt'>Character set,for example UTF-8</td>
+ <td class=xl68 width=87 style='width:65pt'>Character set, for example UTF-8</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
- <td class=xl68 width=87 style='width:65pt'>subExpress,filter expression</td>
+ <td class=xl68 width=87 style='width:65pt'>subExpress, filter expression</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-b</td>
@@ -784,11 +804,11 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-c</td>
- <td class=xl68 width=87 style='width:65pt'>Character set,for example UTF-8</td>
+ <td class=xl68 width=87 style='width:65pt'>Character set, for example UTF-8</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
- <td class=xl68 width=87 style='width:65pt'>subExpress,filter expression</td>
+ <td class=xl68 width=87 style='width:65pt'>subExpress, filter expression</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-b</td>
diff --git a/docs/en/operation.md b/docs/en/operation.md
index 016e718b9..9dff41ab7 100644
--- a/docs/en/operation.md
+++ b/docs/en/operation.md
@@ -583,6 +583,26 @@ The above Broker matches Slave by specifying the same BrokerName, Master's Broke
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
<td class=xl68 width=87 style='width:65pt'>cluster name</td>
</tr>
+ <tr height=57 style='height:43.0pt'>
+ <td rowspan=4 height=160 class=xl69 width=191 style='border-bottom:1.0pt;
+ height:120.0pt;border-top:none;width:143pt'>deleteExpiredCommitLog</td>
+ <td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
+ border-top:none;width:65pt'>delete Broker's expired CommitLog files.</td>
+ <td class=xl67 width=87 style='width:65pt'>-n</td>
+ <td class=xl68 width=87 style='width:65pt'>NameServer Service address, format is ip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+ <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
+ <td class=xl68 width=87 style='width:65pt'>print help info</td>
+ </tr>
+ <tr height=57 style='height:43.0pt'>
+ <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-b</td>
+ <td class=xl68 width=87 style='width:65pt'>Broker address, fomat isip:port</td>
+ </tr>
+ <tr height=23 style='height:17.0pt'>
+ <td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-c</td>
+ <td class=xl68 width=87 style='width:65pt'>cluster name</td>
+ </tr>
<tr height=88 style='mso-height-source:userset;height:66.0pt'>
<td rowspan=4 height=191 class=xl69 width=191 style='border-bottom:1.0pt;
height:143.0pt;border-top:none;width:143pt'>cleanUnusedTopic</td>
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 44f64dbbe..2206bc175 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -438,6 +438,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr);
}
+ @Override
+ public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.deleteExpiredCommitLog(cluster);
+ }
+
+ @Override
+ public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.deleteExpiredCommitLogByAddr(addr);
+ }
+
@Override
public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2d3bcd698..9e99925a8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -754,6 +754,42 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return result;
}
+ @Override
+ public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = false;
+ try {
+ ClusterInfo clusterInfo = examineBrokerClusterInfo();
+ if (null == cluster || "".equals(cluster)) {
+ for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
+ result = deleteExpiredCommitLogByCluster(clusterInfo, targetCluster);
+ }
+ } else {
+ result = deleteExpiredCommitLogByCluster(clusterInfo, cluster);
+ }
+ } catch (MQBrokerException e) {
+ log.error("deleteExpiredCommitLog error.", e);
+ }
+
+ return result;
+ }
+
+ public boolean deleteExpiredCommitLogByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = false;
+ String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
+ for (String addr : addrs) {
+ result = deleteExpiredCommitLogByAddr(addr);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = mqClientInstance.getMQClientAPIImpl().deleteExpiredCommitLog(addr, timeoutMillis);
+ log.warn("Delete expired CommitLog on target " + addr + " broker " + result);
+ return result;
+ }
+
@Override
public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index d746433ce..58e3fb713 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -205,6 +205,12 @@ public interface MQAdminExt extends MQAdmin {
boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException;
+ boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
+ boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index fca04e175..c69b7bf9e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
+import org.apache.rocketmq.tools.command.broker.DeleteExpiredCommitLogSubCommand;
import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand;
import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand;
@@ -201,6 +202,7 @@ public class MQAdminStartup {
initCommand(new UpdateOrderConfCommand());
initCommand(new CleanExpiredCQSubCommand());
+ initCommand(new DeleteExpiredCommitLogSubCommand());
initCommand(new CleanUnusedTopicCommand());
initCommand(new StartMonitoringSubCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java
new file mode 100644
index 000000000..a4b2a51ad
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.broker;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+/**
+ * MQAdmin command which deletes expired CommitLog files
+ */
+public class DeleteExpiredCommitLogSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "deleteExpiredCommitLog";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Delete expired CommitLog files";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("n", "namesrvAddr", true, "Name server address");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerAddr", true, "Broker address");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "clustername");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ if (commandLine.hasOption('n')) {
+ defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim());
+ }
+
+ try {
+ boolean result = false;
+ defaultMQAdminExt.start();
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+ result = defaultMQAdminExt.deleteExpiredCommitLogByAddr(addr);
+
+ } else {
+ String cluster = commandLine.getOptionValue('c');
+ if (null != cluster)
+ cluster = cluster.trim();
+ result = defaultMQAdminExt.deleteExpiredCommitLog(cluster);
+ }
+ System.out.printf(result ? "success" : "false");
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command execute failed.", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommandTest.java
new file mode 100644
index 000000000..61b3acaa5
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommandTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeleteExpiredCommitLogSubCommandTest extends ServerResponseMocker {
+
+ private static final int PORT = 45678;
+
+ private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+ private final PrintStream originalOut = System.out;
+ private final PrintStream originalErr = System.err;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setOut(new PrintStream(outContent));
+ System.setErr(new PrintStream(errContent));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ System.setOut(originalOut);
+ System.setErr(originalErr);
+ }
+
+ @Override
+ protected int getPort() {
+ return PORT;
+ }
+
+ @Override
+ protected byte[] getBody() {
+ return null;
+ }
+
+ @Test
+ public void testExecute() throws SubCommandException {
+ DeleteExpiredCommitLogSubCommand cmd = new DeleteExpiredCommitLogSubCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {"-b 127.0.0.1:" + PORT, "-c default-cluster"};
+ final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
+ cmd.buildCommandlineOptions(options), new PosixParser());
+ cmd.execute(commandLine, options, null);
+ Assert.assertTrue(outContent.toString().startsWith("success"));
+ Assert.assertEquals("", errContent.toString());
+ }
+}