You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/11 02:16:29 UTC

[rocketmq-mqtt] branch main updated: add channel decode exception ,replace JSONObject.toJSONString(obj).getBytes(StandardCharsets.UTF_8) with JSON.toJsonByte(obj),add some unit tests to verify these changes (#59)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 18709f8  add channel decode exception ,replace JSONObject.toJSONString(obj).getBytes(StandardCharsets.UTF_8) with JSON.toJsonByte(obj),add some unit tests to verify these changes (#59)
18709f8 is described below

commit 18709f8b4cbf7a81cfc64d4539671280a6f2cb31
Author: aliensb <al...@live.com>
AuthorDate: Mon Apr 11 10:16:25 2022 +0800

    add channel decode exception ,replace JSONObject.toJSONString(obj).getBytes(StandardCharsets.UTF_8) with JSON.toJsonByte(obj),add some unit tests to verify these changes (#59)
---
 .../rocketmq/mqtt/common/model/Constants.java      |  4 +-
 .../apache/rocketmq/mqtt/common/model/Message.java |  4 +-
 .../mqtt/cs/channel/ChannelDecodeException.java    | 40 +++++++++++++
 .../rocketmq/mqtt/cs/channel/ConnectHandler.java   |  5 +-
 .../cs/protocol/mqtt/MqttPacketDispatcher.java     |  3 +-
 .../rocketmq/mqtt/cs/test/TestConnectHandler.java  | 66 ++++++++++++++++++++++
 .../rocketmq/mqtt/ds/notify/NotifyManager.java     |  7 +--
 .../ds/upstream/processor/PublishProcessor.java    |  9 ++-
 .../rocketmq/mqtt/ds/test/TestNotifyManager.java   | 26 ++++++++-
 9 files changed, 147 insertions(+), 17 deletions(-)

diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index f55afbb..9682bbe 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -29,8 +29,8 @@ public class Constants {
 
     public static final String PROPERTY_NAMESPACE = "namespace";
     public static final String PROPERTY_ORIGIN_MQTT_TOPIC = "originMqttTopic";
-    public static final String PROPERTY_MQTT_QOS = "qoslevel";
-    public static final String PROPERTY_MQTT_CLEAN_SESSION = "cleansessionflag";
+    public static final String PROPERTY_MQTT_QOS = "qosLevel";
+    public static final String PROPERTY_MQTT_CLEAN_SESSION = "cleanSessionFlag";
     public static final String PROPERTY_MQTT_CLIENT = "clientId";
     public static final String PROPERTY_MQTT_RETRY_TIMES = "retryTimes";
     public static final String PROPERTY_MQTT_EXT_DATA = "extData";
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java
index b3be5af..563401c 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Message.java
@@ -49,8 +49,8 @@ public class Message {
     public static String propertyUserProperties = "extData";
 
     public static String extPropertyMqttRealTopic = "mqttRealTopic";
-    public static String extPropertyQoS = "qoslevel";
-    public static String extPropertyCleanSessionFlag = "cleansessionflag";
+    public static String extPropertyQoS = "qosLevel";
+    public static String extPropertyCleanSessionFlag = "cleanSessionFlag";
     public static String extPropertyNamespaceId = "namespace";
     public static String extPropertyClientId = "clientId";
 
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelDecodeException.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelDecodeException.java
new file mode 100644
index 0000000..354151c
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelDecodeException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.channel;
+
+
+public class ChannelDecodeException extends RuntimeException {
+    public ChannelDecodeException() {}
+
+    public ChannelDecodeException(String message) {
+        super(message);
+    }
+
+    public ChannelDecodeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ChannelDecodeException(Throwable cause) {
+        super(cause);
+    }
+
+    public ChannelDecodeException(String message, Throwable cause, boolean enableSuppression,
+        boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
index a891d0a..32481e3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 
@@ -34,6 +34,8 @@ import java.util.List;
 public class ConnectHandler extends ChannelInboundHandlerAdapter {
     private static Logger logger = LoggerFactory.getLogger(ConnectHandler.class);
 
+    private final List<String> simpleExceptions = Collections.singletonList("Connection reset by peer");
+
     @Resource
     private ChannelManager channelManager;
 
@@ -49,7 +51,6 @@ public class ConnectHandler extends ChannelInboundHandlerAdapter {
         channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.CLIENT, "be closed");
     }
 
-    public final List<String> simpleExceptions = Arrays.asList("Connection reset by peer");
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java
index 3443d96..5acf48d 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/MqttPacketDispatcher.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.common.hook.UpstreamHookManager;
 import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
 import org.apache.rocketmq.mqtt.common.util.HostInfo;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelDecodeException;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelException;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
@@ -100,7 +101,7 @@ public class MqttPacketDispatcher extends SimpleChannelInboundHandler<MqttMessag
             return;
         }
         if (!msg.decoderResult().isSuccess()) {
-            throw new RuntimeException(ChannelInfo.getClientId(ctx.channel()) + "," + msg.decoderResult());
+            throw new ChannelDecodeException(ChannelInfo.getClientId(ctx.channel()) + "," + msg.decoderResult());
         }
         ChannelInfo.touch(ctx.channel());
         CompletableFuture<HookResult> upstreamHookResult;
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestConnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestConnectHandler.java
new file mode 100644
index 0000000..87bb6bf
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestConnectHandler.java
@@ -0,0 +1,66 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestConnectHandler {
+
+    @Mock
+    private ChannelHandlerContext ctx;
+    @Mock
+    private Logger logger;
+    @Mock
+    private Channel channel;
+    @Mock
+    private ChannelManager channelManager;
+
+    @Test
+    public void test() throws Exception {
+        ConnectHandler connectHandler=new ConnectHandler();
+        Throwable throwable=new Exception("Connection reset by peer");
+        when(ctx.channel()).thenReturn(channel);
+        FieldUtils.writeDeclaredField(connectHandler,"channelManager",channelManager,true);
+        FieldUtils.writeDeclaredField(connectHandler,"logger",logger,true);
+        connectHandler.exceptionCaught(ctx,throwable);
+        verify(logger,never()).error(anyString(),any(),any());
+        verify(channelManager).closeConnect(channel,ChannelCloseFrom.SERVER, throwable.getMessage());
+
+        connectHandler.exceptionCaught(ctx,new Exception("test exception"));
+        verify(logger,only()).error(anyString(),any(),any());
+        verify(channelManager).closeConnect(channel,ChannelCloseFrom.SERVER, throwable.getMessage());
+    }
+}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index 01c25af..a6327b8 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.mqtt.ds.notify;
 
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSON;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -50,7 +50,6 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -269,7 +268,7 @@ public class NotifyManager {
     private RemotingCommand createMsgEventCommand(Set<MessageEvent> messageEvents) {
         RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RpcCode.CMD_NOTIFY_MQTT_MESSAGE,
                 null);
-        remotingCommand.setBody(JSONObject.toJSONString(messageEvents).getBytes(StandardCharsets.UTF_8));
+        remotingCommand.setBody(JSON.toJSONBytes(messageEvents));
         return remotingCommand;
     }
 
@@ -281,7 +280,7 @@ public class NotifyManager {
         }
         Message message = new Message();
         message.setTopic(serviceConf.getEventNotifyRetryTopic());
-        message.setBody(JSONObject.toJSONString(messageEvents).getBytes(StandardCharsets.UTF_8));
+        message.setBody(JSON.toJSONBytes(messageEvents));
         message.setDelayTimeLevel(delayLevel);
         message.putUserProperty(Constants.PROPERTY_MQTT_MSG_EVENT_RETRY_NODE, node);
         message.putUserProperty(Constants.PROPERTY_MQTT_MSG_EVENT_RETRY_TIME, String.valueOf(retryTime + 1));
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index b01ad56..946f32a 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.mqtt.ds.upstream.processor;
 
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSON;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
@@ -36,7 +36,6 @@ import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.nio.charset.StandardCharsets;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
@@ -66,9 +65,9 @@ public class PublishProcessor implements UpstreamProcessor {
         Message message = MessageUtil.toMessage(mqttPublishMessage);
         message.setMsgId(msgId);
         message.setBornTimestamp(System.currentTimeMillis());
-        CompletableFuture<StoreResult> storeResult = lmqQueueStore.putMessage(queueNames, message);
-        return storeResult.thenCompose(storeResult1 -> HookResult.newHookResult(HookResult.SUCCESS, null,
-                JSONObject.toJSONString(storeResult1).getBytes(StandardCharsets.UTF_8)));
+        CompletableFuture<StoreResult> storeResultFuture = lmqQueueStore.putMessage(queueNames, message);
+        return storeResultFuture.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
+                JSON.toJSONBytes(storeResult)));
     }
 
 }
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java
index 992e5e7..d5e4a4a 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestNotifyManager.java
@@ -19,12 +19,15 @@
 
 package org.apache.rocketmq.mqtt.ds.test;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
+import org.apache.rocketmq.mqtt.common.model.Constants;
 import org.apache.rocketmq.mqtt.common.model.MessageEvent;
 import org.apache.rocketmq.mqtt.common.model.RpcCode;
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
@@ -33,18 +36,24 @@ import org.apache.rocketmq.mqtt.ds.notify.NotifyManager;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Set;
 
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+
 @RunWith(MockitoJUnitRunner.class)
 public class TestNotifyManager {
 
@@ -82,9 +91,24 @@ public class TestNotifyManager {
         RemotingCommand response = mock(RemotingCommand.class);
         when(response.getCode()).thenReturn(RpcCode.SUCCESS);
         when(remotingClient.invokeSync(any(), any(), anyLong())).thenReturn(response);
-
         notifyManager.notifyMessage(new HashSet<>(Arrays.asList(new MessageEvent())));
         verify(remotingClient).invokeSync(any(), any(), anyLong());
     }
 
+    @Test
+    public void testJsonByte() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+        Set<MessageEvent> messageEvents=new HashSet<>();
+        for (int i=0;i<10;i++) {
+            MessageEvent messageEvent = new MessageEvent();
+            messageEvent.setBrokerName("testBroker"+i);
+            messageEvent.setPubTopic("testTopic"+i);
+            messageEvent.setNamespace("testSpace"+i);
+            messageEvents.add(messageEvent);
+        }
+        NotifyManager notifyManager = new NotifyManager();
+        RemotingCommand remotingCommand = (RemotingCommand) MethodUtils.invokeMethod(notifyManager,  true, "createMsgEventCommand", messageEvents);
+        byte[] bytes = JSONObject.toJSONString(messageEvents).getBytes(StandardCharsets.UTF_8);
+        Assert.assertArrayEquals(remotingCommand.getBody(), bytes);
+    }
+
 }