You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/08/17 15:11:33 UTC

[2/2] activemq-artemis git commit: Implement support for intercepting additional MQTT control packets

Implement support for intercepting additional MQTT control packets

Previously, only the PUBLISH packet was intercepted. This patch modifies
the code to add support for the other incoming/outgoing MQTT control
packets.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/654ea69e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/654ea69e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/654ea69e

Branch: refs/heads/master
Commit: 654ea69e78a013ee6eef1fe5f4ddd6438dc2e7aa
Parents: 9cdff41
Author: Otavio R. Piske <an...@gmail.com>
Authored: Mon Aug 14 19:12:39 2017 +0200
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Aug 17 10:11:21 2017 -0500

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTProtocolHandler.java | 25 ++++++++++++++++++++
 .../mqtt/example/InterceptorExample.java        |  6 ++---
 .../mqtt/example/SimpleMQTTInterceptor.java     |  9 +++++++
 .../main/resources/activemq/server0/broker.xml  |  4 ++++
 4 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 7c14403..0c0be01 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -172,6 +172,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
 
       String clientId = connect.payload().clientIdentifier();
       session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
+
+      this.protocolManager.invokeIncoming(connect, this.connection);
    }
 
    void disconnect(boolean error) {
@@ -183,6 +185,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
       MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
 
+      this.protocolManager.invokeOutgoing(message, this.connection);
       ctx.write(message);
       ctx.flush();
    }
@@ -225,30 +228,43 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
                                                         false, 0);
       MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
+
+      this.protocolManager.invokeOutgoing(rel, this.connection);
+
       ctx.write(rel);
       ctx.flush();
    }
 
    void handlePuback(MqttPubAckMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
    }
 
    void handlePubrec(MqttMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
       session.getMqttPublishManager().handlePubRec(messageId);
    }
 
    void handlePubrel(MqttMessage message) {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
       session.getMqttPublishManager().handlePubRel(messageId);
    }
 
    void handlePubcomp(MqttMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
       session.getMqttPublishManager().handlePubComp(messageId);
    }
 
    void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
       int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
 
@@ -264,6 +280,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    }
 
    void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
       MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
@@ -273,10 +291,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    }
 
    void handleUnsuback(MqttUnsubAckMessage message) {
+      this.protocolManager.invokeOutgoing(message, this.connection);
+
       disconnect(true);
    }
 
    void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
       MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
       ctx.write(pingResp);
@@ -288,6 +310,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    }
 
    void handleDisconnect(MqttMessage message) {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       disconnect(false);
    }
 
@@ -296,6 +320,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
       MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
       MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
+
       this.protocolManager.invokeOutgoing(publish, connection);
 
       MQTTUtil.logMessage(session.getSessionState(), publish, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
index 5926553..4fb5abf 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
@@ -41,20 +41,20 @@ public class InterceptorExample {
       System.out.println("Connected to Artemis");
 
       // Subscribe to a topic
-      Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)};
+      Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.EXACTLY_ONCE)};
       connection.subscribe(topics);
       System.out.println("Subscribed to topics.");
 
       // Publish message
       String payload1 = "This is message 1";
 
-      connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+      connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.EXACTLY_ONCE, false);
 
       System.out.println("Sent message");
 
       // Receive the sent message
       Message message1 = connection.receive(5, TimeUnit.SECONDS);
-
+      
       String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8);
 
       System.out.println("Received message: " + messagePayload);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
index 677328c..1b7b482 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.mqtt.example;
 import java.nio.charset.Charset;
 
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
 
 
@@ -36,7 +37,9 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
    public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) {
       System.out.println("MQTT Interceptor gets called ");
 
+      System.out.println("A MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType());
 
+      // If you need to handle an specific packet type:
       if (mqttMessage instanceof MqttPublishMessage) {
          MqttPublishMessage message = (MqttPublishMessage) mqttMessage;
 
@@ -49,6 +52,12 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
 
          message.payload().setBytes(0, modifiedMessage.getBytes());
       }
+      else {
+         if (mqttMessage instanceof MqttConnectMessage) {
+            MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
+            System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage);
+         }
+      }
 
 
       // We return true which means "call next interceptor" (if there is one) or target.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
index f93a404..9318e0c 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
@@ -39,6 +39,10 @@ under the License.
          <class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
       </remoting-incoming-interceptors>
 
+      <remoting-outgoing-interceptors>
+         <class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
+      </remoting-outgoing-interceptors>
+
       <bindings-directory>./data/bindings</bindings-directory>
 
       <journal-directory>./data/journal</journal-directory>