You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/08/17 15:11:32 UTC
[1/2] activemq-artemis git commit: This closes #1464
Repository: activemq-artemis
Updated Branches:
refs/heads/master 9cdff41da -> 952d835cb
This closes #1464
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/952d835c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/952d835c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/952d835c
Branch: refs/heads/master
Commit: 952d835cb574e37e65c42644453b24ce2b572282
Parents: 9cdff41 654ea69
Author: Justin Bertram <jb...@apache.org>
Authored: Thu Aug 17 10:11:21 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Aug 17 10:11:21 2017 -0500
----------------------------------------------------------------------
.../core/protocol/mqtt/MQTTProtocolHandler.java | 25 ++++++++++++++++++++
.../mqtt/example/InterceptorExample.java | 6 ++---
.../mqtt/example/SimpleMQTTInterceptor.java | 9 +++++++
.../main/resources/activemq/server0/broker.xml | 4 ++++
4 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: Implement support for intercepting
additional MQTT control packets
Posted by jb...@apache.org.
Implement support for intercepting additional MQTT control packets
Previously, only the PUBLISH packet was intercepted. This patch modifies
the code to add support for the other incoming/outgoing MQTT control
packets.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/654ea69e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/654ea69e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/654ea69e
Branch: refs/heads/master
Commit: 654ea69e78a013ee6eef1fe5f4ddd6438dc2e7aa
Parents: 9cdff41
Author: Otavio R. Piske <an...@gmail.com>
Authored: Mon Aug 14 19:12:39 2017 +0200
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Aug 17 10:11:21 2017 -0500
----------------------------------------------------------------------
.../core/protocol/mqtt/MQTTProtocolHandler.java | 25 ++++++++++++++++++++
.../mqtt/example/InterceptorExample.java | 6 ++---
.../mqtt/example/SimpleMQTTInterceptor.java | 9 +++++++
.../main/resources/activemq/server0/broker.xml | 4 ++++
4 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 7c14403..0c0be01 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -172,6 +172,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
String clientId = connect.payload().clientIdentifier();
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
+
+ this.protocolManager.invokeIncoming(connect, this.connection);
}
void disconnect(boolean error) {
@@ -183,6 +185,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
+ this.protocolManager.invokeOutgoing(message, this.connection);
ctx.write(message);
ctx.flush();
}
@@ -225,30 +228,43 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
false, 0);
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
+
+ this.protocolManager.invokeOutgoing(rel, this.connection);
+
ctx.write(rel);
ctx.flush();
}
void handlePuback(MqttPubAckMessage message) throws Exception {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
}
void handlePubrec(MqttMessage message) throws Exception {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRec(messageId);
}
void handlePubrel(MqttMessage message) {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRel(messageId);
}
void handlePubcomp(MqttMessage message) throws Exception {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubComp(messageId);
}
void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
@@ -264,6 +280,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
@@ -273,10 +291,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void handleUnsuback(MqttUnsubAckMessage message) {
+ this.protocolManager.invokeOutgoing(message, this.connection);
+
disconnect(true);
}
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
ctx.write(pingResp);
@@ -288,6 +310,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void handleDisconnect(MqttMessage message) {
+ this.protocolManager.invokeIncoming(message, this.connection);
+
disconnect(false);
}
@@ -296,6 +320,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
+
this.protocolManager.invokeOutgoing(publish, connection);
MQTTUtil.logMessage(session.getSessionState(), publish, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
index 5926553..4fb5abf 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
@@ -41,20 +41,20 @@ public class InterceptorExample {
System.out.println("Connected to Artemis");
// Subscribe to a topic
- Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)};
+ Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.EXACTLY_ONCE)};
connection.subscribe(topics);
System.out.println("Subscribed to topics.");
// Publish message
String payload1 = "This is message 1";
- connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+ connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.EXACTLY_ONCE, false);
System.out.println("Sent message");
// Receive the sent message
Message message1 = connection.receive(5, TimeUnit.SECONDS);
-
+
String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8);
System.out.println("Received message: " + messagePayload);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
index 677328c..1b7b482 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.mqtt.example;
import java.nio.charset.Charset;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
@@ -36,7 +37,9 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) {
System.out.println("MQTT Interceptor gets called ");
+ System.out.println("A MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType());
+ // If you need to handle an specific packet type:
if (mqttMessage instanceof MqttPublishMessage) {
MqttPublishMessage message = (MqttPublishMessage) mqttMessage;
@@ -49,6 +52,12 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
message.payload().setBytes(0, modifiedMessage.getBytes());
}
+ else {
+ if (mqttMessage instanceof MqttConnectMessage) {
+ MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
+ System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage);
+ }
+ }
// We return true which means "call next interceptor" (if there is one) or target.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
index f93a404..9318e0c 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
@@ -39,6 +39,10 @@ under the License.
<class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
</remoting-incoming-interceptors>
+ <remoting-outgoing-interceptors>
+ <class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
+ </remoting-outgoing-interceptors>
+
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>