You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/06/22 16:01:53 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1244 propagate retain flag
of received message
Repository: activemq-artemis
Updated Branches:
refs/heads/master c2ad9cab0 -> be8eb3ec9
ARTEMIS-1244 propagate retain flag of received message
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/60fad35c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/60fad35c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/60fad35c
Branch: refs/heads/master
Commit: 60fad35cfe74eb6cd40d79adf56621f022e79416
Parents: c2ad9ca
Author: Jiri Danek <jd...@redhat.com>
Authored: Wed Jun 21 17:51:56 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jun 22 12:00:40 2017 -0400
----------------------------------------------------------------------
.../artemis/core/protocol/mqtt/MQTTProtocolHandler.java | 4 ++--
.../activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java | 5 ++++-
.../apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java | 4 ++--
.../mqtt/imported/MQTTInterceptorPropertiesTest.java | 1 -
4 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 4edce31..6add2db 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -291,9 +291,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
disconnect(false);
}
- protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount) {
+ protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
- MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0);
+ MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
this.protocolManager.invokeOutgoing(publish, connection);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 8dfaf34..5da027a 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -35,6 +35,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
+import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY;
+
/**
* Handles MQTT Exactly Once (QoS level 2) Protocol.
*/
@@ -256,6 +258,7 @@ public class MQTTPublishManager {
private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
+ boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
ByteBuf payload;
switch (message.getType()) {
@@ -274,7 +277,7 @@ public class MQTTPublishManager {
payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
break;
}
- session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
+ session.getProtocolHandler().send(messageId, address, qos, isRetain, payload, deliveryCount);
}
private int decideQoS(Message message, ServerConsumer consumer) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 0e09fb0..76664f6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -61,7 +61,7 @@ public class MQTTUtil {
public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
- public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
+ public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain");
public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
@@ -102,7 +102,7 @@ public class MQTTUtil {
CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
message.setAddress(address);
- message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
+ message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
message.setType(Message.BYTES_TYPE);
return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
index 0b62194..375e2f2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
@@ -64,7 +64,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
MqttFixedHeader header = message.fixedHeader();
assertNotNull(header.messageType());
assertEquals(header.qosLevel().value(), AT_MOST_ONCE);
- // TODO resolve the following line based on result of ARTEMIS-1244, currently fails (2.1.0)
assertEquals(header.isRetain(), expectedProperties.get(RETAINED));
} catch (Throwable t) {
collector.addError(t);
[2/2] activemq-artemis git commit: This closes #1358
Posted by cl...@apache.org.
This closes #1358
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be8eb3ec
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be8eb3ec
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be8eb3ec
Branch: refs/heads/master
Commit: be8eb3ec9fcc6f4849e92a056fb1a7c06327948d
Parents: c2ad9ca 60fad35
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jun 22 12:01:41 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jun 22 12:01:41 2017 -0400
----------------------------------------------------------------------
.../artemis/core/protocol/mqtt/MQTTProtocolHandler.java | 4 ++--
.../activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java | 5 ++++-
.../apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java | 4 ++--
.../mqtt/imported/MQTTInterceptorPropertiesTest.java | 1 -
4 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------