You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/03/26 01:19:09 UTC
[rocketmq] branch mqtt updated: Implementation of handling MQTT
client PINGREQ
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch mqtt
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/mqtt by this push:
new 36c2174 Implementation of handling MQTT client PINGREQ
new b216965 Merge pull request #1116 from Aaron-He/mqtt
36c2174 is described below
commit 36c21740ff139dfa63f85518a0c5545e5bbaa65a
Author: HeChengyang <21...@seu.edu.cn>
AuthorDate: Mon Mar 25 17:58:49 2019 +0800
Implementation of handling MQTT client PINGREQ
---
.../impl/MqttPingreqMessageHandler.java | 28 +++++++-
.../processor/DefaultMqttMessageProcessor.java | 1 +
.../mqtt/MqttPingreqMessageHandlerTest.java | 77 ++++++++++++++++++++++
.../mqtt/dispatcher/EncodeDecodeDispatcher.java | 4 +-
.../mqtt/dispatcher/MqttPingReqEncodeDecode.java | 45 +++++++++++++
.../mqtt/dispatcher/MqttPingRespEncodeDecode.java | 43 ++++++++++++
6 files changed, 195 insertions(+), 3 deletions(-)
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
index d867476..dd1972d 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
@@ -17,14 +17,22 @@
package org.apache.rocketmq.mqtt.mqtthandler.impl;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import org.apache.rocketmq.common.client.Client;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
public class MqttPingreqMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
@@ -43,6 +51,24 @@ public class MqttPingreqMessageHandler implements MessageHandler {
*/
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
- return null;
+ IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
+ Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
+ log.debug("Handle MQTT client: {} Pingreq.", client.getClientId());
+ RemotingCommand response = RemotingCommand.createResponseCommand(MqttHeader.class);
+ if (client != null && client.isConnected()) {
+ client.setLastUpdateTimestamp(System.currentTimeMillis());
+ MqttHeader mqttHeader = (MqttHeader) response.readCustomHeader();
+ mqttHeader.setMessageType(MqttMessageType.PINGRESP.value());
+ mqttHeader.setDup(false);
+ mqttHeader.setQosLevel(0);
+ mqttHeader.setRetain(false);
+ mqttHeader.setRemainingLength(0);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("MQTT Client is null or not connected");
+ return response;
}
}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
index 563fdac..cb28314 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
@@ -132,6 +132,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
break;
case UNSUBSCRIBE:
case PINGREQ:
+ break;
case DISCONNECT:
}
return type2handler.get(MqttMessageType.valueOf(mqttHeader.getMessageType())).handleMessage(mqttMessage, remotingChannel);
diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java
new file mode 100644
index 0000000..204cb8b
--- /dev/null
+++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPingreqMessageHandlerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MqttPingreqMessageHandlerTest {
+ @Mock
+ private RemotingChannel remotingChannel;
+ @Mock
+ private IOTClientManagerImpl iotClientManager;
+ @Mock
+ private MqttMessage mqttMessage;
+ @Mock
+ private Client client;
+ @Mock
+ private DefaultMqttMessageProcessor processor;
+
+ private MqttPingreqMessageHandler mqttPingreqMessageHandler;
+
+ @Before
+ public void init() {
+ mqttPingreqMessageHandler = new MqttPingreqMessageHandler(processor);
+ when(processor.getIotClientManager()).thenReturn(iotClientManager);
+ when(iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel)).thenReturn(client);
+ when(client.getClientId()).thenReturn("Mock Client");
+ }
+
+ @Test
+ public void testHandlerMessageReturnResp() {
+ when(client.isConnected()).thenReturn(true);
+ RemotingCommand response = mqttPingreqMessageHandler.handleMessage(mqttMessage, remotingChannel);
+ verify(client).setLastUpdateTimestamp(anyLong());
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ }
+
+ @Test
+ public void testHandlerMessageReturnNull() {
+ when(client.isConnected()).thenReturn(false);
+ RemotingCommand response = mqttPingreqMessageHandler.handleMessage(mqttMessage, remotingChannel);
+ assertEquals(ResponseCode.SYSTEM_ERROR,response.getCode());
+
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
index 2c17f91..3415e3a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/EncodeDecodeDispatcher.java
@@ -38,8 +38,8 @@ public class EncodeDecodeDispatcher {
encodeDecodeDispatcher.put(MqttMessageType.SUBACK, new MqttSubackEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.UNSUBSCRIBE, new MqttUnSubscribeEncodeDecode());
encodeDecodeDispatcher.put(MqttMessageType.UNSUBACK, new MqttUnSubackEncodeDecode());
- encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, null);
- encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, null);
+ encodeDecodeDispatcher.put(MqttMessageType.PINGREQ, new MqttPingReqEncodeDecode() );
+ encodeDecodeDispatcher.put(MqttMessageType.PINGRESP, new MqttPingRespEncodeDecode());
}
public static Map<MqttMessageType, Message2MessageEncodeDecode> getEncodeDecodeDispatcher() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java
new file mode 100644
index 0000000..569a0c9
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingReqEncodeDecode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import java.io.UnsupportedEncodingException;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+
+public class MqttPingReqEncodeDecode implements Message2MessageEncodeDecode {
+ @Override
+ public RemotingCommand decode(MqttMessage mqttMessage) {
+ MqttHeader mqttHeader = new MqttHeader();
+ MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
+ mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
+ mqttHeader.setDup(mqttFixedHeader.isDup());
+ mqttHeader.setQosLevel(mqttFixedHeader.qosLevel().value());
+ mqttHeader.setRetain(mqttFixedHeader.isRetain());
+ mqttHeader.setRemainingLength(mqttFixedHeader.remainingLength());
+ return RemotingCommand.createRequestCommand(1000, mqttHeader);
+ }
+
+ @Override
+ public MqttMessage encode(
+ RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException {
+ return null;
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java
new file mode 100644
index 0000000..df7431b
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/mqtt/dispatcher/MqttPingRespEncodeDecode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.transport.mqtt.dispatcher;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import java.io.UnsupportedEncodingException;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+
+public class MqttPingRespEncodeDecode implements Message2MessageEncodeDecode {
+ @Override
+ public RemotingCommand decode(MqttMessage mqttMessage) {
+ return null;
+ }
+
+ @Override
+ public MqttMessage encode(
+ RemotingCommand remotingCommand) throws RemotingCommandException, UnsupportedEncodingException {
+ MqttHeader mqttHeader = (MqttHeader) remotingCommand.getCustomHeader();
+ return new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(), mqttHeader.getRemainingLength()));
+
+ }
+}
+