You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/17 21:28:57 UTC
[1/3] activemq-artemis git commit: ARTEMIS-1045 Performance
improvements on AMQP
Repository: activemq-artemis
Updated Branches:
refs/heads/master 861c23155 -> 224d78062
ARTEMIS-1045 Performance improvements on AMQP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/291a4719
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/291a4719
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/291a4719
Branch: refs/heads/master
Commit: 291a4719b6b114b1452a272fd13393262f736a05
Parents: 861c231
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 17 11:14:18 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 17 16:11:14 2017 -0400
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPMessage.java | 68 +++++++++++---------
.../amqp/proton/ProtonServerSenderContext.java | 13 ----
2 files changed, 39 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/291a4719/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 60aae4c..653ee5f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -72,6 +72,7 @@ public class AMQPMessage extends RefCountMessage {
private DeliveryAnnotations _deliveryAnnotations;
private MessageAnnotations _messageAnnotations;
private Properties _properties;
+ private int appLocation = -1;
private ApplicationProperties applicationProperties;
private long scheduledTime = -1;
private String connectionID;
@@ -93,7 +94,7 @@ public class AMQPMessage extends RefCountMessage {
public AMQPMessage(long messageFormat, Message message) {
this.messageFormat = messageFormat;
- this.protonMessage = (MessageImpl)message;
+ this.protonMessage = (MessageImpl) message;
}
@@ -124,7 +125,7 @@ public class AMQPMessage extends RefCountMessage {
_deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
_properties = new Properties();
this.applicationProperties = new ApplicationProperties(new HashMap<>());
- this.protonMessage = (MessageImpl)Message.Factory.create();
+ this.protonMessage = (MessageImpl) Message.Factory.create();
this.protonMessage.setApplicationProperties(applicationProperties);
this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
}
@@ -148,6 +149,20 @@ public class AMQPMessage extends RefCountMessage {
private ApplicationProperties getApplicationProperties() {
parseHeaders();
+
+ if (applicationProperties == null && appLocation >= 0) {
+ ByteBuffer buffer = getBuffer().nioBuffer();
+ buffer.position(appLocation);
+ TLSEncode.getDecoder().setByteBuffer(buffer);
+ Object section = TLSEncode.getDecoder().readObject();
+ if (section instanceof ApplicationProperties) {
+ this.applicationProperties = (ApplicationProperties) section;
+ }
+ this.appLocation = -1;
+ TLSEncode.getDecoder().setByteBuffer(null);
+
+ }
+
return applicationProperties;
}
@@ -161,6 +176,7 @@ public class AMQPMessage extends RefCountMessage {
parsedHeaders = true;
}
}
+
@Override
public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
this.connectionID = connectionID;
@@ -172,7 +188,6 @@ public class AMQPMessage extends RefCountMessage {
return connectionID;
}
-
public MessageAnnotations getMessageAnnotations() {
parseHeaders();
return _messageAnnotations;
@@ -202,7 +217,6 @@ public class AMQPMessage extends RefCountMessage {
return null;
}
-
private void setSymbol(String symbol, Object value) {
setSymbol(Symbol.getSymbol(symbol), value);
}
@@ -231,11 +245,9 @@ public class AMQPMessage extends RefCountMessage {
return null;
} */
-
return null;
}
-
@Override
public SimpleString getGroupID() {
parseHeaders();
@@ -247,7 +259,6 @@ public class AMQPMessage extends RefCountMessage {
}
}
-
@Override
public Long getScheduledDeliveryTime() {
@@ -339,15 +350,19 @@ public class AMQPMessage extends RefCountMessage {
this.expiration = _properties.getAbsoluteExpiryTime().getTime();
}
- if (buffer.hasRemaining()) {
- section = (Section) decoder.readObject();
- } else {
- section = null;
- }
+ // We don't read the next section on purpose, as we will parse ApplicationProperties
+ // lazily
+ section = null;
}
if (section instanceof ApplicationProperties) {
applicationProperties = (ApplicationProperties) section;
+ } else {
+ if (buffer.hasRemaining()) {
+ this.appLocation = buffer.position();
+ } else {
+ this.appLocation = -1;
+ }
}
} finally {
decoder.setByteBuffer(null);
@@ -446,13 +461,11 @@ public class AMQPMessage extends RefCountMessage {
}
}
-
@Override
public Object getDuplicateProperty() {
return null;
}
-
@Override
public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
return null;
@@ -463,7 +476,7 @@ public class AMQPMessage extends RefCountMessage {
if (address == null) {
Properties properties = getProtonMessage().getProperties();
if (properties != null) {
- return properties.getTo();
+ return properties.getTo();
} else {
return null;
}
@@ -539,7 +552,7 @@ public class AMQPMessage extends RefCountMessage {
header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1));
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
TLSEncode.getEncoder().writeObject(header);
- TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
}
}
buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
@@ -676,27 +689,27 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
- return (Boolean)getApplicationPropertiesMap().get(key);
+ return (Boolean) getApplicationPropertiesMap().get(key);
}
@Override
public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
- return (Byte)getApplicationPropertiesMap().get(key);
+ return (Byte) getApplicationPropertiesMap().get(key);
}
@Override
public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
- return (Double)getApplicationPropertiesMap().get(key);
+ return (Double) getApplicationPropertiesMap().get(key);
}
@Override
public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
- return (Integer)getApplicationPropertiesMap().get(key);
+ return (Integer) getApplicationPropertiesMap().get(key);
}
@Override
public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
- return (Long)getApplicationPropertiesMap().get(key);
+ return (Long) getApplicationPropertiesMap().get(key);
}
@Override
@@ -712,12 +725,12 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
- return (Short)getApplicationPropertiesMap().get(key);
+ return (Short) getApplicationPropertiesMap().get(key);
}
@Override
public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
- return (Float)getApplicationPropertiesMap().get(key);
+ return (Float) getApplicationPropertiesMap().get(key);
}
@Override
@@ -727,7 +740,7 @@ public class AMQPMessage extends RefCountMessage {
} else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
return getConnectionID();
} else {
- return (String)getApplicationPropertiesMap().get(key);
+ return (String) getApplicationPropertiesMap().get(key);
}
}
@@ -747,7 +760,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
- return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key));
+ return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
}
@Override
@@ -842,8 +855,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
- memoryEstimate = memoryOffset +
- (data != null ? data.capacity() : 0);
+ memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
}
return memoryEstimate;
@@ -858,7 +870,6 @@ public class AMQPMessage extends RefCountMessage {
}
}
-
@Override
public SimpleString getReplyTo() {
if (getProperties() != null) {
@@ -877,7 +888,6 @@ public class AMQPMessage extends RefCountMessage {
return this;
}
-
@Override
public int getPersistSize() {
checkBuffer();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/291a4719/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 962110e..0e0447f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -42,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
-import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -89,7 +88,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean multicast;
//todo get this from somewhere
private RoutingType defaultRoutingType = RoutingType.ANYCAST;
- protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
private RoutingType routingTypeToUse = defaultRoutingType;
private boolean shared = false;
private boolean global = false;
@@ -110,7 +108,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
@Override
public void onFlow(int currentCredits, boolean drain) {
- this.creditsSemaphore.setCredits(currentCredits);
sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
}
@@ -590,16 +587,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return 0;
}
- if (!creditsSemaphore.tryAcquire()) {
- try {
- creditsSemaphore.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // nothing to be done here.. we just keep going
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
// presettle means we can settle the message on the dealer side before we send it, i.e.
// for browsers
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
[2/3] activemq-artemis git commit: ARTEMIS-1046 Fixing TX eventually
stalling with AMQP
Posted by ta...@apache.org.
ARTEMIS-1046 Fixing TX eventually stalling with AMQP
I have also reviewed the model in which we used transactions
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1ef4dcf7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1ef4dcf7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1ef4dcf7
Branch: refs/heads/master
Commit: 1ef4dcf7d921379618363d035ea8a09794c3637d
Parents: 291a471
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 17 15:59:34 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 17 16:50:56 2017 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPConnectionCallback.java | 25 ++++--
.../amqp/broker/AMQPSessionCallback.java | 30 +++----
.../amqp/proton/AMQPSessionContext.java | 2 +-
.../proton/ProtonServerReceiverContext.java | 4 +-
.../amqp/proton/ProtonServerSenderContext.java | 2 +-
.../transaction/ProtonTransactionHandler.java | 78 ++++++++++-------
.../protocol/amqp/util/DeliveryUtil.java | 18 +---
.../integration/amqp/AmqpTransactionTest.java | 90 +++++++++++++++++++-
8 files changed, 165 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 7e7dc60..4265f28 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -62,7 +62,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class);
- private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
+ private ConcurrentMap<Binary, Transaction> transactions = new ConcurrentHashMap<>();
private final ProtonProtocolManager manager;
@@ -224,25 +224,32 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
public Binary newTransaction() {
XidImpl xid = newXID();
+ Binary binary = new Binary(xid.getGlobalTransactionId());
Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
- transactions.put(xid, transaction);
- return new Binary(xid.getGlobalTransactionId());
+ transactions.put(binary, transaction);
+ return binary;
}
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- XidImpl xid = newXID(txid.getArray());
- Transaction tx = transactions.get(xid);
+ public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
+ Transaction tx;
+
+ if (remove) {
+ tx = transactions.remove(txid);
+ } else {
+ tx = transactions.get(txid);
+ }
if (tx == null) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
+ logger.warn("Couldn't find txid = " + txid);
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(txid.toString());
}
return tx;
}
- public void removeTransaction(Binary txid) {
+ public Transaction removeTransaction(Binary txid) {
XidImpl xid = newXID(txid.getArray());
- transactions.remove(xid);
+ return transactions.remove(xid);
}
protected XidImpl newXID() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 7f7e22b..3592dbc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
-import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -92,6 +91,10 @@ public class AMQPSessionCallback implements SessionCallback {
private final AtomicBoolean draining = new AtomicBoolean(false);
+ public Object getProtonLock() {
+ return connection.getLock();
+ }
+
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@@ -382,8 +385,10 @@ public class AMQPSessionCallback implements SessionCallback {
condition.setDescription(errorMessage);
Rejected rejected = new Rejected();
rejected.setError(condition);
- delivery.disposition(rejected);
- delivery.settle();
+ synchronized (connection.getLock()) {
+ delivery.disposition(rejected);
+ delivery.settle();
+ }
connection.flush();
}
@@ -536,29 +541,14 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- return protonSPI.getTransaction(txid);
+ public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
+ return protonSPI.getTransaction(txid, remove);
}
public Binary newTransaction() {
return protonSPI.newTransaction();
}
- public void commitTX(Binary txid) throws Exception {
- Transaction tx = protonSPI.getTransaction(txid);
- tx.commit(true);
- protonSPI.removeTransaction(txid);
- }
-
- public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
- Transaction tx = protonSPI.getTransaction(txid);
- tx.rollback();
- protonSPI.removeTransaction(txid);
- }
-
- public void dischargeTx(Binary txid) throws ActiveMQAMQPException {
- ((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge();
- }
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, routingType);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index d1fc0e1..ccc4a6c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -142,7 +142,7 @@ public class AMQPSessionContext extends ProtonInitializable {
}
public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
- ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
+ ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection);
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index f08c1fc..54467cf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -155,7 +155,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
- tx = this.sessionSPI.getTransaction(txState.getTxnId());
+ tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
}
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
@@ -201,8 +201,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
} else {
synchronized (connection.getLock()) {
receiver.flow(credits);
- connection.flush();
}
+ connection.flush();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 0e0447f..5a97c02 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -493,7 +493,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (remoteState instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) remoteState;
- ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId());
+ ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
if (txState.getOutcome() != null) {
settleImmediate = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 721bd33..12498b0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -18,10 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
-import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -36,9 +35,6 @@ import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
/**
* handles an amqp Coordinator to deal with transaction boundaries etc
*/
@@ -47,17 +43,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
public static final int DEFAULT_COORDINATOR_CREDIT = 100;
+ public static final int CREDIT_LOW_WATERMARK = 30;
final AMQPSessionCallback sessionSPI;
+ final AMQPConnectionContext connection;
- public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
+ public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
this.sessionSPI = sessionSPI;
+ this.connection = connection;
}
@Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
- ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
final Receiver receiver;
try {
receiver = ((Receiver) delivery.getLink());
@@ -66,9 +63,21 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
return;
}
- receiver.recv(new NettyWritable(buffer));
+ byte[] buffer;
+
+ synchronized (connection.getLock()) {
+ // Replenish coordinator receiver credit on exhaustion so sender can continue
+ // transaction declare and discahrge operations.
+ if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
+ receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+ }
+
+ buffer = new byte[delivery.available()];
+ receiver.recv(buffer, 0, buffer.length);
+ receiver.advance();
+ }
+
- receiver.advance();
MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
@@ -78,44 +87,47 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared();
declared.setTxnId(txID);
- delivery.disposition(declared);
+ synchronized (connection.getLock()) {
+ delivery.disposition(declared);
+ }
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
Binary txID = discharge.getTxnId();
- sessionSPI.dischargeTx(txID);
+ ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
+ tx.discharge();
+
if (discharge.getFail()) {
- try {
- sessionSPI.rollbackTX(txID, true);
+ tx.rollback();
+ synchronized (connection.getLock()) {
delivery.disposition(new Accepted());
- } catch (Exception e) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
}
+ connection.flush();
} else {
- try {
- sessionSPI.commitTX(txID);
+ tx.commit();
+ synchronized (connection.getLock()) {
delivery.disposition(new Accepted());
- } catch (ActiveMQAMQPException amqpE) {
- throw amqpE;
- } catch (Exception e) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
}
- }
-
- // Replenish coordinator receiver credit on exhaustion so sender can continue
- // transaction declare and discahrge operations.
- if (receiver.getCredit() == 0) {
- receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+ connection.flush();
}
}
} catch (ActiveMQAMQPException amqpE) {
- delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
- } catch (Exception e) {
+ log.warn(amqpE.getMessage(), amqpE);
+ synchronized (connection.getLock()) {
+ delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+ }
+ connection.flush();
+ } catch (Throwable e) {
log.warn(e.getMessage(), e);
- delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+ synchronized (connection.getLock()) {
+ delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+ }
+ connection.flush();
} finally {
- delivery.settle();
- buffer.release();
+ synchronized (connection.getLock()) {
+ delivery.settle();
+ }
+ connection.flush();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
index 9257c6b..4267b85 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
@@ -16,28 +16,14 @@
*/
package org.apache.activemq.artemis.protocol.amqp.util;
-import io.netty.buffer.ByteBuf;
-import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
public class DeliveryUtil {
- public static int readDelivery(Receiver receiver, ByteBuf buffer) {
- int initial = buffer.writerIndex();
- // optimization by norman
- int count;
- while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) {
- // Increment the writer index by the number of bytes written into it while calling recv.
- buffer.writerIndex(buffer.writerIndex() + count);
- buffer.ensureWritable(count);
- }
- return buffer.writerIndex() - initial;
- }
-
- public static MessageImpl decodeMessageImpl(ByteBuf buffer) {
+ public static MessageImpl decodeMessageImpl(byte[] data) {
MessageImpl message = (MessageImpl) Message.Factory.create();
- message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+ message.decode(data, 0, data.length);
return message;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index c00cc1c..41bc5e7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,9 +17,20 @@
package org.apache.activemq.artemis.tests.integration.amqp;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -27,6 +38,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -788,4 +801,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
+
+ @Test(timeout = 120000)
+ public void testSendPersistentTX() throws Exception {
+ int MESSAGE_COUNT = 100000;
+ AtomicInteger errors = new AtomicInteger(0);
+ server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true);
+ ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+ Connection sendConnection = factory.createConnection();
+ Connection consumerConnection = factory.createConnection();
+ try {
+
+ Thread receiverThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ consumerConnection.start();
+ Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ javax.jms.Queue q1 = consumerSession.createQueue("q1");
+
+ MessageConsumer consumer = consumerSession.createConsumer(q1);
+
+ for (int i = 1; i <= MESSAGE_COUNT; i++) {
+ Message message = consumer.receive(5000);
+ if (message == null) {
+ throw new IOException("No message read in time.");
+ }
+
+ if (i % 100 == 0) {
+ if (i % 1000 == 0) System.out.println("Read message " + i);
+ consumerSession.commit();
+ }
+ }
+
+ // Assure that all messages are consumed
+ consumerSession.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ receiverThread.start();
+
+ Session sendingSession = sendConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+ javax.jms.Queue q1 = sendingSession.createQueue("q1");
+ MessageProducer producer = sendingSession.createProducer(q1);
+ producer.setDeliveryDelay(DeliveryMode.NON_PERSISTENT);
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ producer.send(sendingSession.createTextMessage("message " + i), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ if (i % 100 == 0) {
+ if (i % 1000 == 0) System.out.println("Sending " + i);
+ sendingSession.commit();
+ }
+ }
+
+ sendingSession.commit();
+
+ receiverThread.join(50000);
+ Assert.assertFalse(receiverThread.isAlive());
+
+ Assert.assertEquals(0, errors.get());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ sendConnection.close();
+ consumerConnection.close();
+ }
+
+ }
}
[3/3] activemq-artemis git commit: This closes #1102
Posted by ta...@apache.org.
This closes #1102
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/224d7806
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/224d7806
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/224d7806
Branch: refs/heads/master
Commit: 224d780622be3f65cef027bd4f84078102d25919
Parents: 861c231 1ef4dcf
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 17 17:28:23 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 17 17:28:23 2017 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPConnectionCallback.java | 25 ++++--
.../protocol/amqp/broker/AMQPMessage.java | 68 ++++++++-------
.../amqp/broker/AMQPSessionCallback.java | 30 +++----
.../amqp/proton/AMQPSessionContext.java | 2 +-
.../proton/ProtonServerReceiverContext.java | 4 +-
.../amqp/proton/ProtonServerSenderContext.java | 15 +---
.../transaction/ProtonTransactionHandler.java | 78 ++++++++++-------
.../protocol/amqp/util/DeliveryUtil.java | 18 +---
.../integration/amqp/AmqpTransactionTest.java | 90 +++++++++++++++++++-
9 files changed, 204 insertions(+), 126 deletions(-)
----------------------------------------------------------------------