You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/03/26 01:19:09 UTC

[rocketmq] branch mqtt updated: Implementation of handling MQTT client PINGREQ

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch mqtt
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/mqtt by this push:
     new 36c2174  Implementation of handling MQTT client PINGREQ
     new b216965  Merge pull request #1116 from Aaron-He/mqtt
36c2174 is described below

commit 36c21740ff139dfa63f85518a0c5545e5bbaa65a
Author: HeChengyang <21...@seu.edu.cn>
AuthorDate: Mon Mar 25 17:58:49 2019 +0800

    Implementation of handling MQTT client PINGREQ
---
 .../impl/MqttPingreqMessageHandler.java            | 28 +++++++-
 .../processor/DefaultMqttMessageProcessor.java     |  1 +
 .../mqtt/MqttPingreqMessageHandlerTest.java        | 77 ++++++++++++++++++++++
 .../mqtt/dispatcher/EncodeDecodeDispatcher.java    |  4 +-
 .../mqtt/dispatcher/MqttPingReqEncodeDecode.java   | 45 +++++++++++++
 .../mqtt/dispatcher/MqttPingRespEncodeDecode.java  | 43 ++++++++++++
 6 files changed, 195 insertions(+), 3 deletions(-)

diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
index d867476..dd1972d 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
@@ -17,14 +17,22 @@
 
 package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import org.apache.rocketmq.common.client.Client;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
 import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
 import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
 
 public class MqttPingreqMessageHandler implements MessageHandler {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
@@ -43,6 +51,24 @@ public class MqttPingreqMessageHandler implements MessageHandler {
      */
     @Override
     public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
-        return null;
+        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
+        Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
+        log.debug("Handle MQTT client: {} Pingreq.", client.getClientId());
+        RemotingCommand response = RemotingCommand.createResponseCommand(MqttHeader.class);
+        if (client != null && client.isConnected()) {
+            client.setLastUpdateTimestamp(System.currentTimeMillis());
+            MqttHeader mqttHeader = (MqttHeader) response.readCustomHeader();
+            mqttHeader.setMessageType(MqttMessageType.PINGRESP.value());
+            mqttHeader.setDup(false);
+            mqttHeader.setQosLevel(0);
+            mqttHeader.setRetain(false);
+            mqttHeader.setRemainingLength(0);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark("MQTT Client is null or not connected");
+        return response;
     }
 }
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
index 563fdac..cb28314 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
@@ -132,6 +132,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
                 break;
             case UNSUBSCRIBE:
             case PINGREQ:
+                break;
             case DISCONNECT:
         }
         return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel);
diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java
new file mode 100644
index 0000000..204cb8b
--- /dev/null
+++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MqttPingreqMessageHandlerTest {
+    @Mock
+    private RemotingChannel remotingChannel;
+    @Mock
+    private IOTClientManagerImpl iotClientManager;
+    @Mock
+    private MqttMessage mqttMessage;
+    @Mock
+    private Client client;
+    @Mock
+    private DefaultMqttMessageProcessor processor;
+
+    private MqttPingreqMessageHandler mqttPingreqMessageHandler;
+
+    @Before
+    public void init() {
+        mqttPingreqMessageHandler = new MqttPingreqMessageHandler(processor);
+        when(processor.getIotClientManager()).thenReturn(iotClientManager);
+        when(iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel)).thenReturn(client);
+        when(client.getClientId()).thenReturn("Mock Client");
+    }
+
+    @Test
+    public void testHandlerMessageReturnResp() {
+        when(client.isConnected()).thenReturn(true);
+        RemotingCommand response = mqttPingreqMessageHandler.handleMessage(mqttMessage, remotingChannel);
+        verify(client).setLastUpdateTimestamp(anyLong());
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+    }
+
+    @Test
+    public void testHandlerMessageReturnNull() {
+        when(client.isConnected()).thenReturn(false);
+        RemotingCommand response = mqttPingreqMessageHandler.handleMessage(mqttMessage, remotingChannel);
+        assertEquals(ResponseCode.SYSTEM_ERROR,response.getCode());
+
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
index 2c17f91..3415e3a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
@@ -38,8 +38,8 @@ public class EncodeDecodeDispatcher {
         encodeDecodeDispatcher.put(MqttMessageType.SUBACK, new MqttSubackEncodeDecode());
         encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, new MqttUnSubscribeEncodeDecode());
         encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, new MqttUnSubackEncodeDecode());
-        encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null);
-        encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null);
+        encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, new MqttPingReqEncodeDecode() );
+        encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, new MqttPingRespEncodeDecode());
     }
 
     public static Map<MqttMessageType, Message2MessageEncodeDecode> getEncodeDecodeDispatcher() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java
new file mode 100644
index 0000000..569a0c9
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import java.io.UnsupportedEncodingException;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+
+public class MqttPingReqEncodeDecode implements Message2MessageEncodeDecode {
+    @Override
+    public RemotingCommand decode(MqttMessage mqttMessage) {
+        MqttHeader mqttHeader = new MqttHeader();
+        MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
+        mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
+        mqttHeader.setDup(mqttFixedHeader.isDup());
+        mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
+        mqttHeader.setRetain(mqttFixedHeader.isRetain());
+        mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
+        return RemotingCommand.createRequestCommand(1000, mqttHeader);
+    }
+
+    @Override
+    public MqttMessage encode(
+        RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException {
+        return null;
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java
new file mode 100644
index 0000000..df7431b
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import java.io.UnsupportedEncodingException;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+
+public class MqttPingRespEncodeDecode implements Message2MessageEncodeDecode {
+    @Override
+    public RemotingCommand decode(MqttMessage mqttMessage) {
+        return null;
+    }
+
+    @Override
+    public MqttMessage encode(
+        RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException {
+        MqttHeader mqttHeader = (MqttHeader) remotingCommand.getCustomHeader();
+        return new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()));
+
+    }
+}
+