You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/21 14:09:30 UTC
[activemq-artemis] 02/03: ARTEMIS-2458 Fix AMQP Transaction Session
Close Ordering
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 25d0b511ce31387a1baa767873af76e9635b1bd3
Author: Michael Pearce <mi...@me.com>
AuthorDate: Wed Aug 21 08:19:32 2019 +0100
ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 2 +-
.../proton/transaction/ProtonTransactionImpl.java | 2 +-
.../apache/activemq/artemis/core/server/Queue.java | 6 +++
.../artemis/core/server/ServerConsumer.java | 2 +
.../artemis/core/server/impl/QueueImpl.java | 48 ++++++++++++++++-
.../artemis/core/server/impl/RefsOperation.java | 7 ++-
.../core/server/impl/ServerConsumerImpl.java | 9 +++-
.../core/transaction/TransactionOperation.java | 4 ++
.../core/transaction/impl/TransactionImpl.java | 62 +++++++++++-----------
.../server/impl/ScheduledDeliveryHandlerTest.java | 10 ++++
.../tests/integration/cli/DummyServerConsumer.java | 5 ++
.../tests/integration/client/JMSOrderTest.java | 50 +++++++++++++++++
.../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++
13 files changed, 180 insertions(+), 37 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index dc249ff..4b2b669 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -363,7 +363,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
- consumer.close(false);
+ consumer.close(false, true);
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 123dbb5..83128e1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
private boolean discharged;
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
- super(xid, storageManager, timeoutSeconds);
+ super(xid, storageManager, timeoutSeconds, true);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index bab38d6..8b91aa9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -146,6 +146,9 @@ public interface Queue extends Bindable,CriticalComponent {
ReferenceCounter getConsumersRefCount();
+ /* Called when a message is cancelled back into the queue */
+ void addSorted(List<MessageReference> refs, boolean scheduling);
+
void reload(MessageReference ref);
void addTail(MessageReference ref);
@@ -154,6 +157,9 @@ public interface Queue extends Bindable,CriticalComponent {
void addHead(MessageReference ref, boolean scheduling);
+ /* Called when a message is cancelled back into the queue */
+ void addSorted(MessageReference ref, boolean scheduling);
+
void addHead(List<MessageReference> refs, boolean scheduling);
void acknowledge(MessageReference ref) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index f1f8b1e..0c9c5bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -63,6 +63,8 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void close(boolean failed) throws Exception;
+ void close(boolean failed, boolean sorted) throws Exception;
+
/**
* This method is just to remove itself from Queues.
* If for any reason during a close an exception occurred, the exception treatment
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e19d9ef..090a83e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -896,6 +896,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
+ public void addSorted(final MessageReference ref, boolean scheduling) {
+ enterCritical(CRITICAL_PATH_ADD_HEAD);
+ synchronized (this) {
+ try {
+ if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
+ return;
+ }
+
+ internalAddSorted(ref);
+
+ directDeliver = false;
+ } finally {
+ leaveCritical(CRITICAL_PATH_ADD_HEAD);
+ }
+ }
+ }
+
+ /* Called when a message is cancelled back into the queue */
+ @Override
public void addHead(final List<MessageReference> refs, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
@@ -913,6 +932,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
+ /* Called when a message is cancelled back into the queue */
+ @Override
+ public void addSorted(final List<MessageReference> refs, boolean scheduling) {
+ enterCritical(CRITICAL_PATH_ADD_HEAD);
+ synchronized (this) {
+ try {
+ for (MessageReference ref : refs) {
+ addSorted(ref, scheduling);
+ }
+
+ resetAllIterators();
+
+ deliverAsync();
+ } finally {
+ leaveCritical(CRITICAL_PATH_ADD_HEAD);
+ }
+ }
+ }
+
@Override
public synchronized void reload(final MessageReference ref) {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
@@ -3461,13 +3499,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
void postRollback(final LinkedList<MessageReference> refs) {
+ postRollback(refs, false);
+ }
+
+ void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
//if we have purged then ignore adding the messages back
if (purgeOnNoConsumers && getConsumerCount() == 0) {
purgeAfterRollback(refs);
return;
}
- addHead(refs, false);
+ if (sorted) {
+ addSorted(refs, false);
+ } else {
+ addHead(refs, false);
+ }
}
private void purgeAfterRollback(LinkedList<MessageReference> refs) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index c8d9297..925f439 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -79,6 +79,11 @@ public class RefsOperation extends TransactionOperationAbstract {
@Override
public void afterRollback(final Transaction tx) {
+ afterRollback(tx, false);
+ }
+
+ @Override
+ public void afterRollback(final Transaction tx, boolean sorted) {
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
long timeBase = System.currentTimeMillis();
@@ -109,7 +114,7 @@ public class RefsOperation extends TransactionOperationAbstract {
QueueImpl queue = entry.getKey();
synchronized (queue) {
- queue.postRollback(refs);
+ queue.postRollback(refs, sorted);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 54cf9a2..ddba797 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -529,7 +529,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
- public synchronized void close(final boolean failed) throws Exception {
+ public void close(final boolean failed) throws Exception {
+ close(failed, false);
+ }
+
+ @Override
+ public synchronized void close(final boolean failed, boolean sorted) throws Exception {
// Close should only ever be done once per consumer.
if (isClosed) return;
@@ -555,7 +560,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
List<MessageReference> refs = cancelRefs(failed, false, null);
- Transaction tx = new TransactionImpl(storageManager);
+ Transaction tx = new TransactionImpl(storageManager, sorted);
refs.forEach(ref -> {
if (logger.isTraceEnabled()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
index 5da1d97..5c7e7e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
@@ -52,6 +52,10 @@ public interface TransactionOperation {
*/
void afterRollback(Transaction tx);
+ default void afterRollback(Transaction tx, boolean sorted) {
+ afterRollback(tx);
+ }
+
List<MessageReference> getRelatedMessageReferences();
List<MessageReference> getListOnConsumer(long consumerID);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index d459975..95983b7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -63,6 +63,8 @@ public class TransactionImpl implements Transaction {
private final long createTime;
+ private final boolean sorted;
+
private volatile boolean containsPersistent;
private int timeoutSeconds = -1;
@@ -96,47 +98,45 @@ public class TransactionImpl implements Transaction {
}
public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
- this.storageManager = storageManager;
-
- xid = null;
-
- id = storageManager.generateID();
-
- createTime = System.currentTimeMillis();
-
- this.timeoutSeconds = timeoutSeconds;
+ this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
}
public TransactionImpl(final StorageManager storageManager) {
- this.storageManager = storageManager;
-
- xid = null;
-
- id = storageManager.generateID();
+ this(storageManager, false);
+ }
- createTime = System.currentTimeMillis();
+ public TransactionImpl(final StorageManager storageManager, boolean sorted) {
+ this(storageManager.generateID(), null, storageManager,-1, sorted);
}
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
- this.storageManager = storageManager;
-
- this.xid = xid;
+ this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
+ }
- id = storageManager.generateID();
+ public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
+ this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
+ }
- createTime = System.currentTimeMillis();
+ public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
+ this(id, xid, storageManager, -1, false);
+ }
- this.timeoutSeconds = timeoutSeconds;
+ public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
+ this(id, xid, storageManager, -1, sorted);
}
- public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
+ private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
this.storageManager = storageManager;
this.xid = xid;
this.id = id;
- createTime = System.currentTimeMillis();
+ this.createTime = System.currentTimeMillis();
+
+ this.timeoutSeconds = timeoutSeconds;
+
+ this.sorted = sorted;
}
// Transaction implementation
@@ -217,7 +217,7 @@ public class TransactionImpl implements Transaction {
logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
}
- internalRollback();
+ internalRollback(sorted);
if (exception != null) {
throw exception;
@@ -276,7 +276,7 @@ public class TransactionImpl implements Transaction {
return;
}
if (state == State.ROLLBACK_ONLY) {
- internalRollback();
+ internalRollback(sorted);
if (exception != null) {
throw exception;
@@ -379,11 +379,11 @@ public class TransactionImpl implements Transaction {
}
}
- internalRollback();
+ internalRollback(sorted);
}
}
- private void internalRollback() throws Exception {
+ private void internalRollback(boolean sorted) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("TransactionImpl::internalRollback " + this);
}
@@ -418,7 +418,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
- afterRollback(operationsToComplete);
+ afterRollback(operationsToComplete, sorted);
}
});
@@ -432,7 +432,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
- afterRollback(storeOperationsToComplete);
+ afterRollback(storeOperationsToComplete, sorted);
}
});
}
@@ -562,10 +562,10 @@ public class TransactionImpl implements Transaction {
}
}
- private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
+ private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
if (operationsToComplete != null) {
for (TransactionOperation operation : operationsToComplete) {
- operation.afterRollback(this);
+ operation.afterRollback(this, sorted);
}
// Help out GC here
operationsToComplete.clear();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index ea15264..b94bd3a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1036,6 +1036,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void addSorted(List<MessageReference> refs, boolean scheduling) {
+ addHead(refs, scheduling);
+ }
+
+ @Override
public void reload(MessageReference ref) {
}
@@ -1056,6 +1061,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void addSorted(MessageReference ref, boolean scheduling) {
+
+ }
+
+ @Override
public void addHead(List<MessageReference> refs, boolean scheduling) {
for (MessageReference ref : refs) {
addFirst(ref);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index ee7bdbb..9858357 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -91,6 +91,11 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
+ public void close(boolean failed, boolean sorted) throws Exception {
+
+ }
+
+ @Override
public void removeItself() throws Exception {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
index dcc5a40..f4087d4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -129,4 +129,54 @@ public class JMSOrderTest extends JMSTestBase {
}
+ @Test(timeout = 60000)
+ public void testReceiveSomeThenClose() throws Exception {
+ Connection connection = protocolCF.createConnection();
+ try {
+ connection.start();
+
+ int totalCount = 5;
+ int consumeBeforeRollback = 2;
+
+ sendToAmqQueue(totalCount);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 1; i <= consumeBeforeRollback; i++) {
+ Message message = consumer.receive(3000);
+ assertNotNull(message);
+ assertEquals("Unexpected message number", i, message.getIntProperty("nr"));
+ }
+
+ session.close();
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ queue = session.createQueue(name.getMethodName());
+ consumer = session.createConsumer(queue);
+
+ // Consume again.. the previously consumed messages should get delivered
+ // again after the rollback and then the remainder should follow
+ List<Integer> messageNumbers = new ArrayList<>();
+ for (int i = 1; i <= totalCount; i++) {
+ Message message = consumer.receive(3000);
+ assertNotNull("Failed to receive message: " + i, message);
+ int msgNum = message.getIntProperty("nr");
+ System.out.println("Received " + msgNum);
+ messageNumbers.add(msgNum);
+ }
+
+ session.commit();
+
+ assertEquals("Unexpected size of list", totalCount, messageNumbers.size());
+ for (int i = 0; i < messageNumbers.size(); i++) {
+ assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
+ }
+ } finally {
+ connection.close();
+ }
+
+ }
+
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 612d621..da25078 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -269,6 +269,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void addSorted(MessageReference ref, boolean scheduling) {
+
+ }
+
+ @Override
public void addHead(List<MessageReference> ref, boolean scheduling) {
// no-op
@@ -460,6 +465,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void addSorted(List<MessageReference> refs, boolean scheduling) {
+
+ }
+
+ @Override
public Set<Consumer> getConsumers() {
// no-op
return null;