You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/01 20:20:36 UTC
activemq-artemis git commit: fix mqtt
Repository: activemq-artemis
Updated Branches:
refs/heads/artemis-1009 9cbff99cd -> 5f01fc59b
fix mqtt
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5f01fc59
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5f01fc59
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5f01fc59
Branch: refs/heads/artemis-1009
Commit: 5f01fc59b0ca2bd954ad303a8796f2df261b0491
Parents: 9cbff99
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 1 15:02:22 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 1 15:20:11 2017 -0500
----------------------------------------------------------------------
.../activemq/artemis/core/message/impl/CoreMessage.java | 1 +
.../artemis/core/protocol/mqtt/MQTTPublishManager.java | 10 +++++++---
.../artemis/core/protocol/mqtt/MQTTSessionCallback.java | 2 +-
.../activemq/artemis/core/protocol/mqtt/MQTTUtil.java | 4 +---
4 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 973c1de..0da9f03 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -243,6 +243,7 @@ public class CoreMessage extends RefCountMessage {
@Override
public Message copy() {
+ checkEncode();
return new CoreMessage(this);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index e619eb9..67ef258 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -32,12 +32,15 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
/**
* Handles MQTT Exactly Once (QoS level 2) Protocol.
*/
public class MQTTPublishManager {
+ private static final Logger logger = Logger.getLogger(MQTTPublishManager.class);
+
private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
private SimpleString managementAddress;
@@ -173,6 +176,7 @@ public class MQTTPublishManager {
}
tx.commit();
} catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
tx.rollback();
throw t;
}
@@ -253,17 +257,17 @@ public class MQTTPublishManager {
switch (message.getType()) {
case Message.TEXT_TYPE:
try {
- SimpleString text = message.getBodyBuffer().readNullableSimpleString();
+ SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString();
byte[] stringPayload = text.toString().getBytes("UTF-8");
payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
payload.writeBytes(stringPayload);
break;
} catch (UnsupportedEncodingException e) {
- log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+ log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}
default:
ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer();
- payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
+ payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
break;
}
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index b997d80..a5b908f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -51,7 +51,7 @@ public class MQTTSessionCallback implements SessionCallback {
try {
session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
} catch (Exception e) {
- log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+ log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}
return 1;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f01fc59/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 6891497..e7b8c50 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -26,7 +26,6 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -114,8 +113,7 @@ public class MQTTUtil {
String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
- // FIXME does this involve a copy?
- message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
+ message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
return message;
}