You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/01 20:20:36 UTC

activemq-artemis git commit: fix mqtt

Repository: activemq-artemis
Updated Branches:
  refs/heads/artemis-1009 9cbff99cd -> 5f01fc59b


fix mqtt


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5f01fc59
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5f01fc59
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5f01fc59

Branch: refs/heads/artemis-1009
Commit: 5f01fc59b0ca2bd954ad303a8796f2df261b0491
Parents: 9cbff99
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 1 15:02:22 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 1 15:20:11 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/core/message/impl/CoreMessage.java   |  1 +
 .../artemis/core/protocol/mqtt/MQTTPublishManager.java    | 10 +++++++---
 .../artemis/core/protocol/mqtt/MQTTSessionCallback.java   |  2 +-
 .../activemq/artemis/core/protocol/mqtt/MQTTUtil.java     |  4 +---
 4 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 973c1de..0da9f03 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -243,6 +243,7 @@ public class CoreMessage extends RefCountMessage {
 
    @Override
    public Message copy() {
+      checkEncode();
       return new CoreMessage(this);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index e619eb9..67ef258 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -32,12 +32,15 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
 
 /**
  * Handles MQTT Exactly Once (QoS level 2) Protocol.
  */
 public class MQTTPublishManager {
 
+   private static final Logger logger = Logger.getLogger(MQTTPublishManager.class);
+
    private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
 
    private SimpleString managementAddress;
@@ -173,6 +176,7 @@ public class MQTTPublishManager {
                }
                tx.commit();
             } catch (Throwable t) {
+               logger.warn(t.getMessage(), t);
                tx.rollback();
                throw t;
             }
@@ -253,17 +257,17 @@ public class MQTTPublishManager {
       switch (message.getType()) {
          case Message.TEXT_TYPE:
             try {
-               SimpleString text = message.getBodyBuffer().readNullableSimpleString();
+               SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString();
                byte[] stringPayload = text.toString().getBytes("UTF-8");
                payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
                payload.writeBytes(stringPayload);
                break;
             } catch (UnsupportedEncodingException e) {
-               log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+               log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
             }
          default:
             ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer();
-            payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
+            payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
             break;
       }
       session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index b997d80..a5b908f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -51,7 +51,7 @@ public class MQTTSessionCallback implements SessionCallback {
       try {
          session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
       } catch (Exception e) {
-         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
       }
       return 1;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 6891497..e7b8c50 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -26,7 +26,6 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
@@ -114,8 +113,7 @@ public class MQTTUtil {
       String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
       Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
 
-      // FIXME does this involve a copy?
-      message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
+      message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
       return message;
    }