You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/11 09:44:15 UTC
[rocketmq] branch develop updated: 修复协议解析漏洞 (#3475)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e1571ee 修复协议解析漏洞 (#3475)
e1571ee is described below
commit e1571eedb5e73abb4e9f1530e08494b4088af5a3
Author: Chengyang He <21...@users.noreply.github.com>
AuthorDate: Sat Dec 11 17:43:56 2021 +0800
修复协议解析漏洞 (#3475)
---
.../acl/plain/PlainAccessValidatorTest.java | 276 ++++++++++++++-------
.../rocketmq/remoting/common/RemotingHelper.java | 3 +-
.../remoting/protocol/RemotingCommand.java | 29 +--
.../remoting/protocol/RocketMQSerializable.java | 42 ++--
.../remoting/protocol/RemotingCommandTest.java | 61 +++--
.../protocol/RocketMQSerializableTest.java | 75 ++++--
6 files changed, 316 insertions(+), 170 deletions(-)
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
index a0eb567..e6954c2 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Before;
@@ -49,6 +50,7 @@ public class PlainAccessValidatorTest {
private PlainAccessValidator plainAccessValidator;
private AclClientRPCHook aclClient;
private SessionCredentials sessionCredentials;
+
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
@@ -72,10 +74,16 @@ public class PlainAccessValidatorTest {
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "127.0.0.1");
- String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey());
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "127.0.0.1");
+ String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey());
+
+ Assert.assertEquals(accessResource.getSignature(), signature);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
- Assert.assertEquals(accessResource.getSignature(), signature);
+ Assert.fail("Should not throw IOException");
+ }
}
@@ -90,8 +98,14 @@ public class PlainAccessValidatorTest {
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@@ -106,8 +120,14 @@ public class PlainAccessValidatorTest {
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
@@ -121,8 +141,14 @@ public class PlainAccessValidatorTest {
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
@@ -143,74 +169,98 @@ public class PlainAccessValidatorTest {
@Test
public void validatePullMessageTest() {
- PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader();
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicC");
pullMessageRequestHeader.setConsumerGroup("consumerGroupA");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,pullMessageRequestHeader);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
public void validateConsumeMessageBackTest() {
- ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader=new ConsumerSendMsgBackRequestHeader();
+ ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = new ConsumerSendMsgBackRequestHeader();
consumerSendMsgBackRequestHeader.setOriginTopic("topicC");
consumerSendMsgBackRequestHeader.setGroup("consumerGroupA");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,consumerSendMsgBackRequestHeader);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, consumerSendMsgBackRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
public void validateQueryMessageTest() {
- QueryMessageRequestHeader queryMessageRequestHeader=new QueryMessageRequestHeader();
+ QueryMessageRequestHeader queryMessageRequestHeader = new QueryMessageRequestHeader();
queryMessageRequestHeader.setTopic("topicC");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE,queryMessageRequestHeader);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, queryMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
public void validateQueryMessageByKeyTest() {
- QueryMessageRequestHeader queryMessageRequestHeader=new QueryMessageRequestHeader();
+ QueryMessageRequestHeader queryMessageRequestHeader = new QueryMessageRequestHeader();
queryMessageRequestHeader.setTopic("topicC");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE,queryMessageRequestHeader);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, queryMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
remotingCommand.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, "false");
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
public void validateHeartBeatTest() {
- HeartbeatData heartbeatData=new HeartbeatData();
- Set<ProducerData> producerDataSet=new HashSet<>();
- Set<ConsumerData> consumerDataSet=new HashSet<>();
- Set<SubscriptionData> subscriptionDataSet=new HashSet<>();
- ProducerData producerData=new ProducerData();
+ HeartbeatData heartbeatData = new HeartbeatData();
+ Set<ProducerData> producerDataSet = new HashSet<>();
+ Set<ConsumerData> consumerDataSet = new HashSet<>();
+ Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
+ ProducerData producerData = new ProducerData();
producerData.setGroupName("producerGroupA");
- ConsumerData consumerData=new ConsumerData();
+ ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName("consumerGroupA");
- SubscriptionData subscriptionData=new SubscriptionData();
+ SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic("topicC");
producerDataSet.add(producerData);
consumerDataSet.add(consumerData);
@@ -218,65 +268,89 @@ public class PlainAccessValidatorTest {
consumerData.setSubscriptionDataSet(subscriptionDataSet);
heartbeatData.setProducerDataSet(producerDataSet);
heartbeatData.setConsumerDataSet(consumerDataSet);
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
remotingCommand.setBody(heartbeatData.encode());
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encode();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
public void validateUnRegisterClientTest() {
- UnregisterClientRequestHeader unregisterClientRequestHeader=new UnregisterClientRequestHeader();
+ UnregisterClientRequestHeader unregisterClientRequestHeader = new UnregisterClientRequestHeader();
unregisterClientRequestHeader.setConsumerGroup("consumerGroupA");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT,unregisterClientRequestHeader);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, unregisterClientRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
public void validateGetConsumerListByGroupTest() {
- GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader=new GetConsumerListByGroupRequestHeader();
+ GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = new GetConsumerListByGroupRequestHeader();
getConsumerListByGroupRequestHeader.setConsumerGroup("consumerGroupA");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP,getConsumerListByGroupRequestHeader);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, getConsumerListByGroupRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
public void validateUpdateConsumerOffSetTest() {
- UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader=new UpdateConsumerOffsetRequestHeader();
+ UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader();
updateConsumerOffsetRequestHeader.setConsumerGroup("consumerGroupA");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET,updateConsumerOffsetRequestHeader);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, updateConsumerOffsetRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test(expected = AclException.class)
public void validateNullAccessKeyTest() {
- SessionCredentials sessionCredentials=new SessionCredentials();
+ SessionCredentials sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ1");
sessionCredentials.setSecretKey("1234");
- AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
+ AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
@@ -286,16 +360,22 @@ public class PlainAccessValidatorTest {
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test(expected = AclException.class)
public void validateErrorSecretKeyTest() {
- SessionCredentials sessionCredentials=new SessionCredentials();
+ SessionCredentials sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("1234");
- AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
+ AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
@@ -305,8 +385,14 @@ public class PlainAccessValidatorTest {
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
@@ -318,8 +404,14 @@ public class PlainAccessValidatorTest {
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress);
- plainAccessValidator.validate(accessResource);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress);
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
@@ -349,7 +441,7 @@ public class PlainAccessValidatorTest {
plainAccessValidator.updateAccessConfig(plainAccessConfig);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
- List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get("accounts");
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>) readableMap.get("accounts");
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get("accessKey").equals(plainAccessConfig.getAccessKey())) {
@@ -358,17 +450,17 @@ public class PlainAccessValidatorTest {
}
}
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"1234567890");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"SUB");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_ADMIN_ROLE),false);
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR),"192.168.0.*");
- Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
- Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY), "1234567890");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM), "SUB");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM), "PUB");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_ADMIN_ROLE), false);
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR), "192.168.0.*");
+ Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(), 2);
+ Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(), 2);
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get("dataVersion");
- Assert.assertEquals(1,dataVersions.get(0).get("counter"));
+ Assert.assertEquals(1, dataVersions.get(0).get("counter"));
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
@@ -391,7 +483,7 @@ public class PlainAccessValidatorTest {
plainAccessValidator.updateAccessConfig(plainAccessConfig);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
- List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
@@ -399,7 +491,7 @@ public class PlainAccessValidatorTest {
break;
}
}
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY), "123456789111");
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
@@ -433,7 +525,7 @@ public class PlainAccessValidatorTest {
plainAccessValidator.updateAccessConfig(plainAccessConfig);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
- List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
@@ -441,19 +533,19 @@ public class PlainAccessValidatorTest {
break;
}
}
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"DENY");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
- Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
- Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
- Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicC=PUB|SUB"));
- Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicB=PUB"));
- Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupB=PUB|SUB"));
- Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupC=DENY"));
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY), "123456789111");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM), "DENY");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM), "PUB");
+ Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(), 2);
+ Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(), 2);
+ Assert.assertTrue(((List) verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicC=PUB|SUB"));
+ Assert.assertTrue(((List) verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicB=PUB"));
+ Assert.assertTrue(((List) verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupB=PUB|SUB"));
+ Assert.assertTrue(((List) verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupC=DENY"));
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
- Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+ Assert.assertEquals(1, dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
// Update element in the acl config yaml file
PlainAccessConfig plainAccessConfig2 = new PlainAccessConfig();
@@ -464,7 +556,7 @@ public class PlainAccessValidatorTest {
plainAccessValidator.updateAccessConfig(plainAccessConfig2);
Map<String, Object> readableMap2 = AclUtils.getYamlDataObject(targetFileName, Map.class);
- List<Map<String, Object>> accounts2 = (List<Map<String, Object>>)readableMap2.get(AclConstants.CONFIG_ACCOUNTS);
+ List<Map<String, Object>> accounts2 = (List<Map<String, Object>>) readableMap2.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap2 = null;
for (Map<String, Object> account : accounts2) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig2.getAccessKey())) {
@@ -475,8 +567,8 @@ public class PlainAccessValidatorTest {
// Verify the dateversion element after updating is correct or not
List<Map<String, Object>> dataVersions2 = (List<Map<String, Object>>) readableMap2.get(AclConstants.CONFIG_DATA_VERSION);
- Assert.assertEquals(2,dataVersions2.get(0).get(AclConstants.CONFIG_COUNTER));
- Assert.assertEquals(verifyMap2.get(AclConstants.CONFIG_SECRET_KEY),"1234567890123");
+ Assert.assertEquals(2, dataVersions2.get(0).get(AclConstants.CONFIG_COUNTER));
+ Assert.assertEquals(verifyMap2.get(AclConstants.CONFIG_SECRET_KEY), "1234567890123");
// Restore the backup file and flush to yaml file
@@ -511,7 +603,7 @@ public class PlainAccessValidatorTest {
plainAccessValidator.deleteAccessConfig(accessKey);
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
- List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> verifyMap = null;
for (Map<String, Object> account : accounts) {
if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accessKey)) {
@@ -521,10 +613,10 @@ public class PlainAccessValidatorTest {
}
// Verify the specified element is removed or not
- Assert.assertEquals(verifyMap,null);
+ Assert.assertEquals(verifyMap, null);
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
- Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+ Assert.assertEquals(1, dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
@@ -596,21 +688,21 @@ public class PlainAccessValidatorTest {
Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
- List<String> globalWhiteAddrList = (List<String>)readableMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
+ List<String> globalWhiteAddrList = (List<String>) readableMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.1"));
Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.2"));
Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.3"));
// Verify the dateversion element is correct or not
List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
- Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+ Assert.assertEquals(1, dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
// Restore the backup file and flush to yaml file
AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
}
@Test
- public void getAllAclConfigTest(){
+ public void getAllAclConfigTest() {
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
AclConfig aclConfig = plainAccessValidator.getAllAclConfig();
Assert.assertEquals(aclConfig.getGlobalWhiteAddrs().size(), 2);
@@ -619,7 +711,7 @@ public class PlainAccessValidatorTest {
@Test
- public void updateAccessConfigEmptyPermListTest(){
+ public void updateAccessConfigEmptyPermListTest() {
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
String accessKey = "updateAccessConfigEmptyPerm";
@@ -632,14 +724,14 @@ public class PlainAccessValidatorTest {
plainAccessValidator.updateAccessConfig(plainAccessConfig);
PlainAccessConfig result = plainAccessValidator.getAllAclConfig().getPlainAccessConfigs()
- .stream().filter(c->c.getAccessKey().equals(accessKey)).findFirst().orElse(null);
+ .stream().filter(c -> c.getAccessKey().equals(accessKey)).findFirst().orElse(null);
Assert.assertEquals(0, result.getTopicPerms().size());
plainAccessValidator.deleteAccessConfig(accessKey);
}
@Test
- public void updateAccessConfigEmptyWhiteRemoteAddressTest(){
+ public void updateAccessConfigEmptyWhiteRemoteAddressTest() {
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
String accessKey = "updateAccessConfigEmptyWhiteRemoteAddress";
@@ -652,7 +744,7 @@ public class PlainAccessValidatorTest {
plainAccessValidator.updateAccessConfig(plainAccessConfig);
PlainAccessConfig result = plainAccessValidator.getAllAclConfig().getPlainAccessConfigs()
- .stream().filter(c->c.getAccessKey().equals(accessKey)).findFirst().orElse(null);
+ .stream().filter(c -> c.getAccessKey().equals(accessKey)).findFirst().orElse(null);
Assert.assertEquals("", result.getWhiteRemoteAddress());
plainAccessValidator.deleteAccessConfig(accessKey);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 7dacea9..1a36666 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -61,7 +62,7 @@ public class RemotingHelper {
public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException {
+ RemotingSendRequestException, RemotingTimeoutException, RemotingCommandException {
long beginTime = System.currentTimeMillis();
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 51b6194..0eb01c9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -17,6 +17,13 @@
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@@ -24,12 +31,6 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
@@ -39,7 +40,7 @@ public class RemotingCommand {
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
private static final int RPC_ONEWAY = 1; // 0, RPC
private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
- new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
+ new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
// 1, Oneway
// 1, RESPONSE_COMMAND
@@ -111,7 +112,7 @@ public class RemotingCommand {
}
public static RemotingCommand createResponseCommand(int code, String remark,
- Class<? extends CommandCustomHeader> classHeader) {
+ Class<? extends CommandCustomHeader> classHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.markResponseType();
cmd.setCode(code);
@@ -136,12 +137,12 @@ public class RemotingCommand {
return createResponseCommand(code, remark, null);
}
- public static RemotingCommand decode(final byte[] array) {
+ public static RemotingCommand decode(final byte[] array) throws RemotingCommandException {
ByteBuffer byteBuffer = ByteBuffer.wrap(array);
return decode(byteBuffer);
}
- public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+ public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
@@ -166,7 +167,7 @@ public class RemotingCommand {
return length & 0xFFFFFF;
}
- private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+ private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
@@ -232,7 +233,7 @@ public class RemotingCommand {
}
public CommandCustomHeader decodeCommandCustomHeader(
- Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
+ Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
@@ -529,8 +530,8 @@ public class RemotingCommand {
@Override
public String toString() {
return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
- + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
- + serializeTypeCurrentRPC + "]";
+ + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ + serializeTypeCurrentRPC + "]";
}
public SerializeType getSerializeTypeCurrentRPC() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
index 66119e0..6075d7a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.remoting.protocol;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
@@ -85,10 +87,10 @@ public class RocketMQSerializable {
Map.Entry<String, String> entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
kvLength =
- // keySize + Key
- 2 + entry.getKey().getBytes(CHARSET_UTF8).length
- // valSize + val
- + 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
+ // keySize + Key
+ 2 + entry.getKey().getBytes(CHARSET_UTF8).length
+ // valSize + val
+ + 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
totalLength += kvLength;
}
}
@@ -117,23 +119,23 @@ public class RocketMQSerializable {
private static int calTotalLen(int remark, int ext) {
// int code(~32767)
int length = 2
- // LanguageCode language
- + 1
- // int version(~32767)
- + 2
- // int opaque
- + 4
- // int flag
- + 4
- // String remark
- + 4 + remark
- // HashMap<String, String> extFields
- + 4 + ext;
+ // LanguageCode language
+ + 1
+ // int version(~32767)
+ + 2
+ // int opaque
+ + 4
+ // int flag
+ + 4
+ // String remark
+ + 4 + remark
+ // HashMap<String, String> extFields
+ + 4 + ext;
return length;
}
- public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
+ public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException {
RemotingCommand cmd = new RemotingCommand();
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
@@ -149,6 +151,9 @@ public class RocketMQSerializable {
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
+ if (remarkLength > headerArray.length) {
+ throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length);
+ }
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
@@ -157,6 +162,9 @@ public class RocketMQSerializable {
// HashMap<String, String> extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
+ if (extFieldsLength > headerArray.length) {
+ throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length);
+ }
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
cmd.setExtFields(mapDeserialize(extFieldsBytes));
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 2bd41ce..0b19d44 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.junit.Assert;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -118,10 +119,18 @@ public class RemotingCommandTest {
buffer.get(bytes, 0, buffer.limit() - 4);
buffer = ByteBuffer.wrap(bytes);
- RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
+ RemotingCommand decodedCommand = null;
+ try {
+ decodedCommand = RemotingCommand.decode(buffer);
+
+ assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
+ assertThat(decodedCommand.getBody()).isNull();
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
+
- assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
- assertThat(decodedCommand.getBody()).isNull();
}
@Test
@@ -141,10 +150,16 @@ public class RemotingCommandTest {
buffer.get(bytes, 0, buffer.limit() - 4);
buffer = ByteBuffer.wrap(bytes);
- RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
+ RemotingCommand decodedCommand = null;
+ try {
+ decodedCommand = RemotingCommand.decode(buffer);
- assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
- assertThat(decodedCommand.getBody()).isEqualTo(new byte[] {0, 1, 2, 3, 4});
+ assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
+ assertThat(decodedCommand.getBody()).isEqualTo(new byte[] {0, 1, 2, 3, 4});
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
@@ -165,22 +180,30 @@ public class RemotingCommandTest {
buffer.get(bytes, 0, buffer.limit() - 4);
buffer = ByteBuffer.wrap(bytes);
- RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
+ RemotingCommand decodedCommand = null;
+ try {
+ decodedCommand = RemotingCommand.decode(buffer);
+
+ assertThat(decodedCommand.getExtFields().get("stringValue")).isEqualTo("bilibili");
+ assertThat(decodedCommand.getExtFields().get("intValue")).isEqualTo("2333");
+ assertThat(decodedCommand.getExtFields().get("longValue")).isEqualTo("23333333");
+ assertThat(decodedCommand.getExtFields().get("booleanValue")).isEqualTo("true");
+ assertThat(decodedCommand.getExtFields().get("doubleValue")).isEqualTo("0.618");
+
+ assertThat(decodedCommand.getExtFields().get("key")).isEqualTo("value");
- assertThat(decodedCommand.getExtFields().get("stringValue")).isEqualTo("bilibili");
- assertThat(decodedCommand.getExtFields().get("intValue")).isEqualTo("2333");
- assertThat(decodedCommand.getExtFields().get("longValue")).isEqualTo("23333333");
- assertThat(decodedCommand.getExtFields().get("booleanValue")).isEqualTo("true");
- assertThat(decodedCommand.getExtFields().get("doubleValue")).isEqualTo("0.618");
+ CommandCustomHeader decodedHeader = decodedCommand.decodeCommandCustomHeader(ExtFieldsHeader.class);
+ assertThat(((ExtFieldsHeader) decodedHeader).getStringValue()).isEqualTo("bilibili");
+ assertThat(((ExtFieldsHeader) decodedHeader).getIntValue()).isEqualTo(2333);
+ assertThat(((ExtFieldsHeader) decodedHeader).getLongValue()).isEqualTo(23333333l);
+ assertThat(((ExtFieldsHeader) decodedHeader).isBooleanValue()).isEqualTo(true);
+ assertThat(((ExtFieldsHeader) decodedHeader).getDoubleValue()).isBetween(0.617, 0.619);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
- assertThat(decodedCommand.getExtFields().get("key")).isEqualTo("value");
+ Assert.fail("Should not throw IOException");
+ }
- CommandCustomHeader decodedHeader = decodedCommand.decodeCommandCustomHeader(ExtFieldsHeader.class);
- assertThat(((ExtFieldsHeader) decodedHeader).getStringValue()).isEqualTo("bilibili");
- assertThat(((ExtFieldsHeader) decodedHeader).getIntValue()).isEqualTo(2333);
- assertThat(((ExtFieldsHeader) decodedHeader).getLongValue()).isEqualTo(23333333l);
- assertThat(((ExtFieldsHeader) decodedHeader).isBooleanValue()).isEqualTo(true);
- assertThat(((ExtFieldsHeader) decodedHeader).getDoubleValue()).isBetween(0.617, 0.619);
}
@Test
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
index f1db54f..149ec72 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.remoting.protocol;
import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.junit.Assert;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -42,15 +45,22 @@ public class RocketMQSerializableTest {
assertThat(parseToInt(result, 13)).isEqualTo(0); //empty remark
assertThat(parseToInt(result, 17)).isEqualTo(0); //empty extFields
- RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
-
- assertThat(decodedCommand.getCode()).isEqualTo(code);
- assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
- assertThat(decodedCommand.getVersion()).isEqualTo(2333);
- assertThat(decodedCommand.getOpaque()).isEqualTo(opaque);
- assertThat(decodedCommand.getFlag()).isEqualTo(0);
- assertThat(decodedCommand.getRemark()).isNull();
- assertThat(decodedCommand.getExtFields()).isNull();
+ RemotingCommand decodedCommand = null;
+ try {
+ decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+
+ assertThat(decodedCommand.getCode()).isEqualTo(code);
+ assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
+ assertThat(decodedCommand.getVersion()).isEqualTo(2333);
+ assertThat(decodedCommand.getOpaque()).isEqualTo(opaque);
+ assertThat(decodedCommand.getFlag()).isEqualTo(0);
+ assertThat(decodedCommand.getRemark()).isNull();
+ assertThat(decodedCommand.getExtFields()).isNull();
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
@@ -80,15 +90,21 @@ public class RocketMQSerializableTest {
assertThat(parseToInt(result, 30)).isEqualTo(0); //empty extFields
- RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
-
- assertThat(decodedCommand.getCode()).isEqualTo(code);
- assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
- assertThat(decodedCommand.getVersion()).isEqualTo(2333);
- assertThat(decodedCommand.getOpaque()).isEqualTo(opaque);
- assertThat(decodedCommand.getFlag()).isEqualTo(0);
- assertThat(decodedCommand.getRemark()).contains("Sample Remark");
- assertThat(decodedCommand.getExtFields()).isNull();
+ try {
+ RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+
+ assertThat(decodedCommand.getCode()).isEqualTo(code);
+ assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
+ assertThat(decodedCommand.getVersion()).isEqualTo(2333);
+ assertThat(decodedCommand.getOpaque()).isEqualTo(opaque);
+ assertThat(decodedCommand.getFlag()).isEqualTo(0);
+ assertThat(decodedCommand.getRemark()).contains("Sample Remark");
+ assertThat(decodedCommand.getExtFields()).isNull();
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test
@@ -118,15 +134,20 @@ public class RocketMQSerializableTest {
HashMap<String, String> extFields = RocketMQSerializable.mapDeserialize(extFieldsArray);
assertThat(extFields).contains(new HashMap.SimpleEntry("key", "value"));
- RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
-
- assertThat(decodedCommand.getCode()).isEqualTo(code);
- assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
- assertThat(decodedCommand.getVersion()).isEqualTo(2333);
- assertThat(decodedCommand.getOpaque()).isEqualTo(opaque);
- assertThat(decodedCommand.getFlag()).isEqualTo(0);
- assertThat(decodedCommand.getRemark()).isNull();
- assertThat(decodedCommand.getExtFields()).contains(new HashMap.SimpleEntry("key", "value"));
+ try {
+ RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+ assertThat(decodedCommand.getCode()).isEqualTo(code);
+ assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
+ assertThat(decodedCommand.getVersion()).isEqualTo(2333);
+ assertThat(decodedCommand.getOpaque()).isEqualTo(opaque);
+ assertThat(decodedCommand.getFlag()).isEqualTo(0);
+ assertThat(decodedCommand.getRemark()).isNull();
+ assertThat(decodedCommand.getExtFields()).contains(new HashMap.SimpleEntry("key", "value"));
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
}
@Test