You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/01/28 12:01:05 UTC
[1/2] activemq-artemis git commit: Fixing ServerMessage's copy and
MQTT delivery
Repository: activemq-artemis
Updated Branches:
refs/heads/master f149e76b3 -> 0ad32aadf
Fixing ServerMessage's copy and MQTT delivery
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/78d2fe7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/78d2fe7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/78d2fe7f
Branch: refs/heads/master
Commit: 78d2fe7f28b88b75331a26d17f7bd554c1f7f3f1
Parents: 53044de
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jan 27 16:17:13 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 27 16:52:28 2016 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 2 +-
.../impl/ResetLimitWrappedActiveMQBuffer.java | 9 +++++++-
.../artemis/core/message/impl/MessageImpl.java | 24 +++++++++++++-------
.../proton/converter/jms/ServerJMSMessage.java | 2 +-
.../core/protocol/mqtt/MQTTProtocolHandler.java | 2 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 4 ++--
.../openwire/OpenWireMessageConverter.java | 2 +-
.../stomp/VersionedStompFrameHandler.java | 2 +-
.../impl/openmbean/OpenTypeSupport.java | 2 +-
.../core/server/impl/ServerMessageImpl.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 2 +-
.../integration/client/AcknowledgeTest.java | 2 +-
12 files changed, 35 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index f5cb55b..5bb6f42 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -248,7 +248,7 @@ public interface Message {
* Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
* of this buffer should not impact the underlying buffer.
*/
- ActiveMQBuffer getBodyBufferCopy();
+ ActiveMQBuffer getBodyBufferDuplicate();
// Properties
// -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 61ecef6..ec6cf09 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.buffers.impl;
import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
@@ -45,7 +46,13 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
// a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing
- super(unwrap(buffer.byteBuf()).duplicate());
+ this(limit, unwrap(buffer.byteBuf()).duplicate(), message);
+ }
+
+ public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) {
+ // a wrapped inside a wrapper will increase the stack size.
+ // we fixed this here due to some profiling testing
+ super(buffer);
this.limit = limit;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
index ca52621..7583ce2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
@@ -21,12 +21,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@@ -147,16 +149,20 @@ public abstract class MessageImpl implements MessageInternal {
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
// many subscriptions and bridging to other nodes in a cluster
synchronized (other) {
- bufferValid = other.bufferValid;
- endOfBodyPosition = other.endOfBodyPosition;
+ bufferValid = false;
+ endOfBodyPosition = -1;
endOfMessagePosition = other.endOfMessagePosition;
if (other.buffer != null) {
// We need to copy the underlying buffer too, since the different messsages thereafter might have different
// properties set on them, making their encoding different
- buffer = other.buffer.copy(0, other.buffer.writerIndex());
+ buffer = other.buffer.copy(0, other.buffer.capacity());
buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
+
+ bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
+ bodyBuffer.readerIndex(BODY_OFFSET);
+ bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
}
}
}
@@ -267,14 +273,16 @@ public abstract class MessageImpl implements MessageInternal {
}
@Override
- public synchronized ActiveMQBuffer getBodyBufferCopy() {
- // Must copy buffer before sending it
+ public synchronized ActiveMQBuffer getBodyBufferDuplicate() {
- ActiveMQBuffer newBuffer = buffer.copy(0, buffer.capacity());
+ // Must copy buffer before sending it
- newBuffer.setIndex(0, getEndOfBodyPosition());
+ ByteBuf byteBuf = ChannelBufferWrapper.unwrap(getBodyBuffer().byteBuf());
+ byteBuf = byteBuf.duplicate();
+ byteBuf.writerIndex(getBodyBuffer().writerIndex());
+ byteBuf.readerIndex(getBodyBuffer().readerIndex());
- return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, newBuffer, null);
+ return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, byteBuf, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
index 1686ea7..7902fa0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java
@@ -52,7 +52,7 @@ public class ServerJMSMessage implements Message {
protected ActiveMQBuffer getReadBodyBuffer() {
if (readBodyBuffer == null) {
// to avoid clashes between multiple threads
- readBodyBuffer = message.getBodyBufferCopy();
+ readBodyBuffer = message.getBodyBufferDuplicate();
}
return readBodyBuffer;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index c38593c..57c2b57 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -144,7 +144,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
}
catch (Exception e) {
- log.debug("Error processing Control Packet, Disconnecting Client" + e.getMessage());
+ log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage());
disconnect();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index ac40420..b0df5a2 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -216,8 +216,8 @@ public class MQTTPublishManager {
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
- //FIXME should we be copying the body buffer here?
- ByteBuf payload = message.getBodyBufferCopy().byteBuf();
+ ByteBuf payload = message.getBodyBufferDuplicate().byteBuf();
+
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 6d3ff13..2b863c1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -492,7 +492,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
amqMsg.setBrokerInTime(brokerInTime);
- ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
+ ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate();
Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 1e0206a..5da8574 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -293,7 +293,7 @@ public abstract class VersionedStompFrameHandler {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
- ActiveMQBuffer buffer = serverMessage.getBodyBufferCopy();
+ ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate();
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index 1f3a109..7bb3764 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -135,7 +135,7 @@ public final class OpenTypeSupport {
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
- ActiveMQBuffer bodyCopy = m.getBodyBufferCopy();
+ ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
byte[] bytes = new byte[bodyCopy.readableBytes()];
bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, bytes);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
index 645d326..e7a7e67 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
@@ -293,7 +293,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
@Override
public String toString() {
- return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferCopy().capacity() +
+ return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() +
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index e3d6850..c54881d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -570,7 +570,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public ActiveMQBuffer getBodyBufferCopy() {
+ public ActiveMQBuffer getBodyBufferDuplicate() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/78d2fe7f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 6067f94..476f191 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -437,7 +437,7 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
- public ActiveMQBuffer getBodyBufferCopy() {
+ public ActiveMQBuffer getBodyBufferDuplicate() {
return null;
}
[2/2] activemq-artemis git commit: Merge #357
Posted by ma...@apache.org.
Merge #357
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0ad32aad
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0ad32aad
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0ad32aad
Branch: refs/heads/master
Commit: 0ad32aadf02f38c96f5579743657a99c8f5b087c
Parents: f149e76 78d2fe7
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Jan 28 10:59:30 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Jan 28 10:59:30 2016 +0000
----------------------------------------------------------------------
.../org/apache/activemq/artemis/core/message/impl/MessageImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0ad32aad/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------