You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/11 02:16:29 UTC
[rocketmq-mqtt] branch main updated: add channel decode exception ,replace JSONObject.toJSONString(obj).getBytes(StandardCharsets.UTF_8) with JSON.toJsonByte(obj),add some unit tests to verify these changes (#59)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 18709f8 add channel decode exception ,replace JSONObject.toJSONString(obj).getBytes(StandardCharsets.UTF_8) with JSON.toJsonByte(obj),add some unit tests to verify these changes (#59)
18709f8 is described below
commit 18709f8b4cbf7a81cfc64d4539671280a6f2cb31
Author: aliensb <al...@live.com>
AuthorDate: Mon Apr 11 10:16:25 2022 +0800
add channel decode exception ,replace JSONObject.toJSONString(obj).getBytes(StandardCharsets.UTF_8) with JSON.toJsonByte(obj),add some unit tests to verify these changes (#59)
---
.../rocketmq/mqtt/common/model/Constants.java | 4 +-
.../apache/rocketmq/mqtt/common/model/Message.java | 4 +-
.../mqtt/cs/channel/ChannelDecodeException.java | 40 +++++++++++++
.../rocketmq/mqtt/cs/channel/ConnectHandler.java | 5 +-
.../cs/protocol/mqtt/MqttPacketDispatcher.java | 3 +-
.../rocketmq/mqtt/cs/test/TestConnectHandler.java | 66 ++++++++++++++++++++++
.../rocketmq/mqtt/ds/notify/NotifyManager.java | 7 +--
.../ds/upstream/processor/PublishProcessor.java | 9 ++-
.../rocketmq/mqtt/ds/test/TestNotifyManager.java | 26 ++++++++-
9 files changed, 147 insertions(+), 17 deletions(-)
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index f55afbb..9682bbe 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -29,8 +29,8 @@ public class Constants {
public static final String PROPERTY_NAMESPACE = "namespace";
public static final String PROPERTY_ORIGIN_MQTT_TOPIC = "originMqttTopic";
- public static final String PROPERTY_MQTT_QOS = "qoslevel";
- public static final String PROPERTY_MQTT_CLEAN_SESSION = "cleansessionflag";
+ public static final String PROPERTY_MQTT_QOS = "qosLevel";
+ public static final String PROPERTY_MQTT_CLEAN_SESSION = "cleanSessionFlag";
public static final String PROPERTY_MQTT_CLIENT = "clientId";
public static final String PROPERTY_MQTT_RETRY_TIMES = "retryTimes";
public static final String PROPERTY_MQTT_EXT_DATA = "extData";
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java
index b3be5af..563401c 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java
@@ -49,8 +49,8 @@ public class Message {
public static String propertyUserProperties = "extData";
public static String extPropertyMqttRealTopic = "mqttRealTopic";
- public static String extPropertyQoS = "qoslevel";
- public static String extPropertyCleanSessionFlag = "cleansessionflag";
+ public static String extPropertyQoS = "qosLevel";
+ public static String extPropertyCleanSessionFlag = "cleanSessionFlag";
public static String extPropertyNamespaceId = "namespace";
public static String extPropertyClientId = "clientId";
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelDecodeException.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelDecodeException.java
new file mode 100644
index 0000000..354151c
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelDecodeException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.channel;
+
+
+public class ChannelDecodeException extends RuntimeException {
+ public ChannelDecodeException() {}
+
+ public ChannelDecodeException(String message) {
+ super(message);
+ }
+
+ public ChannelDecodeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ChannelDecodeException(Throwable cause) {
+ super(cause);
+ }
+
+ public ChannelDecodeException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
index a891d0a..32481e3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
@@ -34,6 +34,8 @@ import java.util.List;
public class ConnectHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(ConnectHandler.class);
+ private final List<String> simpleExceptions = Collections.singletonList("Connection reset by peer");
+
@Resource
private ChannelManager channelManager;
@@ -49,7 +51,6 @@ public class ConnectHandler extends ChannelInboundHandlerAdapter {
channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.CLIENT, "be closed");
}
- public final List<String> simpleExceptions = Arrays.asList("Connection reset by peer");
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java
index 3443d96..5acf48d 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.common.hook.UpstreamHookManager;
import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
import org.apache.rocketmq.mqtt.common.util.HostInfo;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelDecodeException;
import org.apache.rocketmq.mqtt.cs.channel.ChannelException;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
@@ -100,7 +101,7 @@ public class MqttPacketDispatcher extends SimpleChannelInboundHandler<MqttMessag
return;
}
if (!msg.decoderResult().isSuccess()) {
- throw new RuntimeException(ChannelInfo.getClientId(ctx.channel()) + "," + msg.decoderResult());
+ throw new ChannelDecodeException(ChannelInfo.getClientId(ctx.channel()) + "," + msg.decoderResult());
}
ChannelInfo.touch(ctx.channel());
CompletableFuture<HookResult> upstreamHookResult;
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestConnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestConnectHandler.java
new file mode 100644
index 0000000..87bb6bf
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestConnectHandler.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestConnectHandler {
+
+ @Mock
+ private ChannelHandlerContext ctx;
+ @Mock
+ private Logger logger;
+ @Mock
+ private Channel channel;
+ @Mock
+ private ChannelManager channelManager;
+
+ @Test
+ public void test() throws Exception {
+ ConnectHandler connectHandler=new ConnectHandler();
+ Throwable throwable=new Exception("Connection reset by peer");
+ when(ctx.channel()).thenReturn(channel);
+ FieldUtils.writeDeclaredField(connectHandler,"channelManager",channelManager,true);
+ FieldUtils.writeDeclaredField(connectHandler,"logger",logger,true);
+ connectHandler.exceptionCaught(ctx,throwable);
+ verify(logger,never()).error(anyString(),any(),any());
+ verify(channelManager).closeConnect(channel,ChannelCloseFrom.SERVER, throwable.getMessage());
+
+ connectHandler.exceptionCaught(ctx,new Exception("test exception"));
+ verify(logger,only()).error(anyString(),any(),any());
+ verify(channelManager).closeConnect(channel,ChannelCloseFrom.SERVER, throwable.getMessage());
+ }
+}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index 01c25af..a6327b8 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.mqtt.ds.notify;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -50,7 +50,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
-import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -269,7 +268,7 @@ public class NotifyManager {
private RemotingCommand createMsgEventCommand(Set<MessageEvent> messageEvents) {
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RpcCode.CMD_NOTIFY_MQTT_MESSAGE,
null);
- remotingCommand.setBody(JSONObject.toJSONString(messageEvents).getBytes(StandardCharsets.UTF_8));
+ remotingCommand.setBody(JSON.toJSONBytes(messageEvents));
return remotingCommand;
}
@@ -281,7 +280,7 @@ public class NotifyManager {
}
Message message = new Message();
message.setTopic(serviceConf.getEventNotifyRetryTopic());
- message.setBody(JSONObject.toJSONString(messageEvents).getBytes(StandardCharsets.UTF_8));
+ message.setBody(JSON.toJSONBytes(messageEvents));
message.setDelayTimeLevel(delayLevel);
message.putUserProperty(Constants.PROPERTY_MQTT_MSG_EVENT_RETRY_NODE, node);
message.putUserProperty(Constants.PROPERTY_MQTT_MSG_EVENT_RETRY_TIME, String.valueOf(retryTime + 1));
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index b01ad56..946f32a 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.mqtt.ds.upstream.processor;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
@@ -36,7 +36,6 @@ import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
-import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -66,9 +65,9 @@ public class PublishProcessor implements UpstreamProcessor {
Message message = MessageUtil.toMessage(mqttPublishMessage);
message.setMsgId(msgId);
message.setBornTimestamp(System.currentTimeMillis());
- CompletableFuture<StoreResult> storeResult = lmqQueueStore.putMessage(queueNames, message);
- return storeResult.thenCompose(storeResult1 -> HookResult.newHookResult(HookResult.SUCCESS, null,
- JSONObject.toJSONString(storeResult1).getBytes(StandardCharsets.UTF_8)));
+ CompletableFuture<StoreResult> storeResultFuture = lmqQueueStore.putMessage(queueNames, message);
+ return storeResultFuture.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
+ JSON.toJSONBytes(storeResult)));
}
}
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java
index 992e5e7..d5e4a4a 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java
@@ -19,12 +19,15 @@
package org.apache.rocketmq.mqtt.ds.test;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
+import org.apache.rocketmq.mqtt.common.model.Constants;
import org.apache.rocketmq.mqtt.common.model.MessageEvent;
import org.apache.rocketmq.mqtt.common.model.RpcCode;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
@@ -33,18 +36,24 @@ import org.apache.rocketmq.mqtt.ds.notify.NotifyManager;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Set;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+
@RunWith(MockitoJUnitRunner.class)
public class TestNotifyManager {
@@ -82,9 +91,24 @@ public class TestNotifyManager {
RemotingCommand response = mock(RemotingCommand.class);
when(response.getCode()).thenReturn(RpcCode.SUCCESS);
when(remotingClient.invokeSync(any(), any(), anyLong())).thenReturn(response);
-
notifyManager.notifyMessage(new HashSet<>(Arrays.asList(new MessageEvent())));
verify(remotingClient).invokeSync(any(), any(), anyLong());
}
+ @Test
+ public void testJsonByte() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ Set<MessageEvent> messageEvents=new HashSet<>();
+ for (int i=0;i<10;i++) {
+ MessageEvent messageEvent = new MessageEvent();
+ messageEvent.setBrokerName("testBroker"+i);
+ messageEvent.setPubTopic("testTopic"+i);
+ messageEvent.setNamespace("testSpace"+i);
+ messageEvents.add(messageEvent);
+ }
+ NotifyManager notifyManager = new NotifyManager();
+ RemotingCommand remotingCommand = (RemotingCommand) MethodUtils.invokeMethod(notifyManager, true, "createMsgEventCommand", messageEvents);
+ byte[] bytes = JSONObject.toJSONString(messageEvents).getBytes(StandardCharsets.UTF_8);
+ Assert.assertArrayEquals(remotingCommand.getBody(), bytes);
+ }
+
}