You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/06/22 16:01:53 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1244 propagate retain flag of received message

Repository: activemq-artemis
Updated Branches:
  refs/heads/master c2ad9cab0 -> be8eb3ec9


ARTEMIS-1244 propagate retain flag of received message


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/60fad35c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/60fad35c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/60fad35c

Branch: refs/heads/master
Commit: 60fad35cfe74eb6cd40d79adf56621f022e79416
Parents: c2ad9ca
Author: Jiri Danek <jd...@redhat.com>
Authored: Wed Jun 21 17:51:56 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jun 22 12:00:40 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/mqtt/MQTTProtocolHandler.java         | 4 ++--
 .../activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java | 5 ++++-
 .../apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java    | 4 ++--
 .../mqtt/imported/MQTTInterceptorPropertiesTest.java            | 1 -
 4 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 4edce31..6add2db 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -291,9 +291,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       disconnect(false);
    }
 
-   protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount) {
+   protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
       boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
-      MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0);
+      MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
       MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
       MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
       this.protocolManager.invokeOutgoing(publish, connection);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 8dfaf34..5da027a 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -35,6 +35,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.jboss.logging.Logger;
 
+import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY;
+
 /**
  * Handles MQTT Exactly Once (QoS level 2) Protocol.
  */
@@ -256,6 +258,7 @@ public class MQTTPublishManager {
 
    private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
       String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
+      boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
 
       ByteBuf payload;
       switch (message.getType()) {
@@ -274,7 +277,7 @@ public class MQTTPublishManager {
             payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
             break;
       }
-      session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
+      session.getProtocolHandler().send(messageId, address, qos, isRetain, payload, deliveryCount);
    }
 
    private int decideQoS(Message message, ServerConsumer consumer) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 0e09fb0..76664f6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -61,7 +61,7 @@ public class MQTTUtil {
 
    public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
 
-   public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
+   public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain");
 
    public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
 
@@ -102,7 +102,7 @@ public class MQTTUtil {
 
       CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
       message.setAddress(address);
-      message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
+      message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
       message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
       message.setType(Message.BYTES_TYPE);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60fad35c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
index 0b62194..375e2f2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
@@ -64,7 +64,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
          MqttFixedHeader header = message.fixedHeader();
          assertNotNull(header.messageType());
          assertEquals(header.qosLevel().value(), AT_MOST_ONCE);
-         // TODO resolve the following line based on result of ARTEMIS-1244, currently fails (2.1.0)
          assertEquals(header.isRetain(), expectedProperties.get(RETAINED));
       } catch (Throwable t) {
          collector.addError(t);


[2/2] activemq-artemis git commit: This closes #1358

Posted by cl...@apache.org.
This closes #1358


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be8eb3ec
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be8eb3ec
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be8eb3ec

Branch: refs/heads/master
Commit: be8eb3ec9fcc6f4849e92a056fb1a7c06327948d
Parents: c2ad9ca 60fad35
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jun 22 12:01:41 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jun 22 12:01:41 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/mqtt/MQTTProtocolHandler.java         | 4 ++--
 .../activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java | 5 ++++-
 .../apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java    | 4 ++--
 .../mqtt/imported/MQTTInterceptorPropertiesTest.java            | 1 -
 4 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------