You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/22 14:55:17 UTC
[1/3] activemq-artemis git commit: This closes #1114
Repository: activemq-artemis
Updated Branches:
refs/heads/master 10c9d797d -> d963a4a75
This closes #1114
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d963a4a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d963a4a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d963a4a7
Branch: refs/heads/master
Commit: d963a4a7581cca1f08c0d010dcd0bdc12b04bb60
Parents: 10c9d79 0bfb39b
Author: Justin Bertram <jb...@apache.org>
Authored: Wed Mar 22 09:55:02 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Mar 22 09:55:02 2017 -0500
----------------------------------------------------------------------
.../impl/netty/ActiveMQChannelHandler.java | 6 +-
.../netty/PartialPooledByteBufAllocator.java | 6 +-
.../protocol/amqp/broker/AMQPMessage.java | 11 +-
.../amqp/proton/ProtonServerSenderContext.java | 159 ++++++++++---------
.../amqp/proton/handler/ProtonHandler.java | 2 +-
.../hornetq/HornetQProtocolManager.java | 2 +-
.../protocol/core/impl/CoreProtocolManager.java | 2 +-
.../core/server/impl/ServerConsumerImpl.java | 12 +-
8 files changed, 110 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
[3/3] activemq-artemis git commit: ARTEMIS-1051 Adding synchronized
calls on ACKs
Posted by jb...@apache.org.
ARTEMIS-1051 Adding synchronized calls on ACKs
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ac7cafb2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ac7cafb2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ac7cafb2
Branch: refs/heads/master
Commit: ac7cafb210c2da126fe2f22ec54fcfe5de648d18
Parents: 10c9d79
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 21 10:33:36 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Mar 22 09:55:02 2017 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/ServerConsumerImpl.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ac7cafb2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 710a22b..e812f2e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -731,7 +731,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* This will be useful for other protocols that will need this such as openWire or MQTT.
*/
@Override
- public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove,
+ public synchronized List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove,
Object protocolDataStart,
Object protocolDataEnd) {
LinkedList<MessageReference> retReferences = new LinkedList<>();
@@ -766,7 +766,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public void acknowledge(Transaction tx, final long messageID) throws Exception {
+ public synchronized void acknowledge(Transaction tx, final long messageID) throws Exception {
if (browseOnly) {
return;
}
@@ -830,7 +830,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public void individualAcknowledge(Transaction tx, final long messageID) throws Exception {
+ public synchronized void individualAcknowledge(Transaction tx, final long messageID) throws Exception {
if (browseOnly) {
return;
}
@@ -892,7 +892,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public void individualCancel(final long messageID, boolean failed) throws Exception {
+ public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
if (browseOnly) {
return;
}
@@ -911,12 +911,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public void backToDelivering(MessageReference reference) {
+ public synchronized void backToDelivering(MessageReference reference) {
deliveringRefs.addFirst(reference);
}
@Override
- public MessageReference removeReferenceByID(final long messageID) throws Exception {
+ public synchronized MessageReference removeReferenceByID(final long messageID) throws Exception {
if (browseOnly) {
return null;
}
[2/3] activemq-artemis git commit: ARTEMIS-1056 Performance
improvements on AMQP
Posted by jb...@apache.org.
ARTEMIS-1056 Performance improvements on AMQP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0bfb39bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0bfb39bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0bfb39bf
Branch: refs/heads/master
Commit: 0bfb39bfb577200499f6ee2a80a00b65c2ef8d02
Parents: ac7cafb
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 21 11:43:06 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Mar 22 09:55:02 2017 -0500
----------------------------------------------------------------------
.../impl/netty/ActiveMQChannelHandler.java | 6 +-
.../netty/PartialPooledByteBufAllocator.java | 6 +-
.../protocol/amqp/broker/AMQPMessage.java | 11 +-
.../amqp/proton/ProtonServerSenderContext.java | 159 ++++++++++---------
.../amqp/proton/handler/ProtonHandler.java | 2 +-
.../hornetq/HornetQProtocolManager.java | 2 +-
.../protocol/core/impl/CoreProtocolManager.java | 2 +-
7 files changed, 104 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
index 93be281..cc4407c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
@@ -65,7 +65,11 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
- handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+ try {
+ handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+ } finally {
+ buffer.release();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
index 5e67952..3a192e8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
@@ -53,17 +53,17 @@ public class PartialPooledByteBufAllocator implements ByteBufAllocator {
@Override
public ByteBuf ioBuffer() {
- return UNPOOLED.heapBuffer();
+ return POOLED.directBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
- return UNPOOLED.heapBuffer(initialCapacity);
+ return POOLED.directBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
- return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+ return POOLED.directBuffer(initialCapacity, maxCapacity);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 7de9577..d02eace 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -63,7 +63,7 @@ public class AMQPMessage extends RefCountMessage {
final long messageFormat;
ByteBuf data;
boolean bufferValid;
- boolean durable;
+ Boolean durable;
long messageID;
String address;
MessageImpl protonMessage;
@@ -491,10 +491,15 @@ public class AMQPMessage extends RefCountMessage {
@Override
public boolean isDurable() {
+ if (durable != null) {
+ return durable;
+ }
+
if (getHeader() != null && getHeader().getDurable() != null) {
- return getHeader().getDurable().booleanValue();
- } else {
+ durable = getHeader().getDurable().booleanValue();
return durable;
+ } else {
+ return durable != null ? durable : false;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index d24464c..780ca4d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -481,89 +481,100 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return;
}
- Message message = ((MessageReference) delivery.getContext()).getMessage();
+ try {
+ Message message = ((MessageReference) delivery.getContext()).getMessage();
- boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+ boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
- DeliveryState remoteState = delivery.getRemoteState();
-
- boolean settleImmediate = true;
- if (remoteState != null) {
- // If we are transactional then we need ack if the msg has been accepted
- if (remoteState instanceof TransactionalState) {
-
- TransactionalState txState = (TransactionalState) remoteState;
- ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
-
- if (txState.getOutcome() != null) {
- settleImmediate = false;
- Outcome outcome = txState.getOutcome();
- if (outcome instanceof Accepted) {
- if (!delivery.remotelySettled()) {
- TransactionalState txAccepted = new TransactionalState();
- txAccepted.setOutcome(Accepted.getInstance());
- txAccepted.setTxnId(txState.getTxnId());
- delivery.disposition(txAccepted);
- }
- // we have to individual ack as we can't guarantee we will get the delivery
- // updates (including acks) in order
- // from dealer, a perf hit but a must
- try {
- sessionSPI.ack(tx, brokerConsumer, message);
- tx.addDelivery(delivery, this);
- } catch (Exception e) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+ DeliveryState remoteState;
+
+ synchronized (connection.getLock()) {
+ remoteState = delivery.getRemoteState();
+ }
+
+ boolean settleImmediate = true;
+ if (remoteState != null) {
+ // If we are transactional then we need ack if the msg has been accepted
+ if (remoteState instanceof TransactionalState) {
+
+ TransactionalState txState = (TransactionalState) remoteState;
+ ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
+
+ if (txState.getOutcome() != null) {
+ settleImmediate = false;
+ Outcome outcome = txState.getOutcome();
+ if (outcome instanceof Accepted) {
+ if (!delivery.remotelySettled()) {
+ TransactionalState txAccepted = new TransactionalState();
+ txAccepted.setOutcome(Accepted.getInstance());
+ txAccepted.setTxnId(txState.getTxnId());
+ synchronized (connection.getLock()) {
+ delivery.disposition(txAccepted);
+ }
+ }
+ // we have to individual ack as we can't guarantee we will get the delivery
+ // updates (including acks) in order
+ // from dealer, a perf hit but a must
+ try {
+ sessionSPI.ack(tx, brokerConsumer, message);
+ tx.addDelivery(delivery, this);
+ } catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+ }
}
}
- }
- } else if (remoteState instanceof Accepted) {
- //this can happen in the twice ack mode, that is the receiver accepts and settles separately
- //acking again would show an exception but would have no negative effect but best to handle anyway.
- if (delivery.isSettled()) {
- return;
- }
- // we have to individual ack as we can't guarantee we will get the delivery updates
- // (including acks) in order
- // from dealer, a perf hit but a must
- try {
- sessionSPI.ack(null, brokerConsumer, message);
- } catch (Exception e) {
- log.warn(e.toString(), e);
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
- }
- } else if (remoteState instanceof Released) {
- try {
- sessionSPI.cancel(brokerConsumer, message, false);
- } catch (Exception e) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
- }
- } else if (remoteState instanceof Rejected) {
- try {
- sessionSPI.cancel(brokerConsumer, message, true);
- } catch (Exception e) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
- }
- } else if (remoteState instanceof Modified) {
- try {
- Modified modification = (Modified) remoteState;
- if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
- sessionSPI.cancel(brokerConsumer, message, true);
- } else {
+ } else if (remoteState instanceof Accepted) {
+ //this can happen in the twice ack mode, that is the receiver accepts and settles separately
+ //acking again would show an exception but would have no negative effect but best to handle anyway.
+ if (delivery.isSettled()) {
+ return;
+ }
+ // we have to individual ack as we can't guarantee we will get the delivery updates
+ // (including acks) in order
+ // from dealer, a perf hit but a must
+ try {
+ sessionSPI.ack(null, brokerConsumer, message);
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+ }
+ } else if (remoteState instanceof Released) {
+ try {
sessionSPI.cancel(brokerConsumer, message, false);
+ } catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+ }
+ } else if (remoteState instanceof Rejected) {
+ try {
+ sessionSPI.cancel(brokerConsumer, message, true);
+ } catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+ }
+ } else if (remoteState instanceof Modified) {
+ try {
+ Modified modification = (Modified) remoteState;
+ if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
+ sessionSPI.cancel(brokerConsumer, message, true);
+ } else {
+ sessionSPI.cancel(brokerConsumer, message, false);
+ }
+ } catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
- } catch (Exception e) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
- }
- // todo add tag caching
- if (!preSettle) {
- protonSession.replaceTag(delivery.getTag());
- }
+ // todo add tag caching
+ if (!preSettle) {
+ protonSession.replaceTag(delivery.getTag());
+ }
- if (settleImmediate) settle(delivery);
+ if (settleImmediate)
+ settle(delivery);
- } else {
- // todo not sure if we need to do anything here
+ } else {
+ // todo not sure if we need to do anything here
+ }
+ } finally {
+ connection.flush();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 673a688..6b66f62 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -240,7 +240,7 @@ public class ProtonHandler extends ProtonInitializable {
}
// For returning PooledBytes
- ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
ByteBuffer head = transport.head();
head.position(offset);
head.limit(offset + size);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
index 2f6ed2f..de7d2ff 100644
--- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
@@ -49,7 +49,7 @@ class HornetQProtocolManager extends CoreProtocolManager {
buffer.getByte(5) == 'T' &&
buffer.getByte(6) == 'Q') {
//todo add some handshaking
- buffer.readBytes(7);
+ buffer.skipBytes(7);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index cc81fbe..2cfd451 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -174,7 +174,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
//if we are not an old client then handshake
if (isArtemis(buffer)) {
- buffer.readBytes(7);
+ buffer.skipBytes(7);
}
}