You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/10/18 22:18:48 UTC
[1/2] activemq-artemis git commit: ARTEMIS-2136 synchronize copy
constructor
Repository: activemq-artemis
Updated Branches:
refs/heads/master f70075a78 -> ef7ff38de
ARTEMIS-2136 synchronize copy constructor
A synchronization block was missed during the AMQP refactoring.
This could impact use-cases involving diverts, cluster bridges, and
multiple concurrent consumers.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/175d77a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/175d77a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/175d77a5
Branch: refs/heads/master
Commit: 175d77a5b78023a44c9ce140e39ea282fe9da7c9
Parents: f70075a
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 18 17:18:45 2018 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Oct 18 16:53:07 2018 -0500
----------------------------------------------------------------------
.../artemis/core/message/impl/CoreMessage.java | 52 +++++++++++---------
1 file changed, 30 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/175d77a5/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index b548b29..2052991 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -213,14 +213,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public ActiveMQBuffer getReadOnlyBodyBuffer() {
checkEncode();
- internalWritableBuffer();
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
@Override
public int getBodyBufferSize() {
checkEncode();
- internalWritableBuffer();
return endOfBodyPosition - BUFFER_HEADER_SPACE;
}
@@ -319,6 +317,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
if (!validBuffer) {
encode();
}
+ internalWritableBuffer();
}
@Override
@@ -364,10 +363,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private void internalWritableBuffer() {
if (writableBuffer == null) {
- writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
- if (endOfBodyPosition > 0) {
- writableBuffer.byteBuf().setIndex(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE + BODY_OFFSET);
- writableBuffer.resetReaderIndex();
+ synchronized (this) {
+ if (writableBuffer == null) {
+ writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
+ if (endOfBodyPosition > 0) {
+ writableBuffer.byteBuf().setIndex(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE + BODY_OFFSET);
+ writableBuffer.resetReaderIndex();
+ }
+ }
}
}
}
@@ -404,22 +407,27 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
- this.body = other.body;
- this.endOfBodyPosition = other.endOfBodyPosition;
- this.messageID = other.messageID;
- this.address = other.address;
- this.type = other.type;
- this.durable = other.durable;
- this.expiration = other.expiration;
- this.timestamp = other.timestamp;
- this.priority = other.priority;
- this.userID = other.userID;
- this.coreMessageObjectPools = other.coreMessageObjectPools;
- if (copyProperties != null) {
- this.properties = new TypedProperties(copyProperties);
- }
- if (other.buffer != null) {
- this.buffer = other.buffer.copy();
+ // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
+ // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
+ // many subscriptions and bridging to other nodes in a cluster
+ synchronized (other) {
+ this.body = other.body;
+ this.endOfBodyPosition = other.endOfBodyPosition;
+ this.messageID = other.messageID;
+ this.address = other.address;
+ this.type = other.type;
+ this.durable = other.durable;
+ this.expiration = other.expiration;
+ this.timestamp = other.timestamp;
+ this.priority = other.priority;
+ this.userID = other.userID;
+ this.coreMessageObjectPools = other.coreMessageObjectPools;
+ if (copyProperties != null) {
+ this.properties = new TypedProperties(copyProperties);
+ }
+ if (other.buffer != null) {
+ this.buffer = other.buffer.copy();
+ }
}
}
[2/2] activemq-artemis git commit: This closes #2381
Posted by jb...@apache.org.
This closes #2381
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ef7ff38d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ef7ff38d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ef7ff38d
Branch: refs/heads/master
Commit: ef7ff38de982515617430aa653bda7feda086739
Parents: f70075a 175d77a
Author: Justin Bertram <jb...@apache.org>
Authored: Thu Oct 18 17:18:28 2018 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Oct 18 17:18:28 2018 -0500
----------------------------------------------------------------------
.../artemis/core/message/impl/CoreMessage.java | 52 +++++++++++---------
1 file changed, 30 insertions(+), 22 deletions(-)
----------------------------------------------------------------------