You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/12/27 11:36:08 UTC
[qpid-broker-j] 03/03: QPID-8385 : Apply simpler version of fix
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 7210fc645005923652591ae83365f7bc9f90ef88
Author: rgodfrey <rg...@apache.org>
AuthorDate: Thu Dec 26 16:19:11 2019 +0100
QPID-8385 : Apply simpler version of fix
(cherry picked from commit 7e3a450cbeba471466d215de6c31a1eb1f5f2128)
---
.../apache/qpid/server/queue/AbstractQueue.java | 34 +++-------------------
1 file changed, 4 insertions(+), 30 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 546ad32..4382249 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1760,38 +1760,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private void dequeueEntry(final QueueEntry node)
{
- ServerTransaction txn = new AsyncAutoCommitTransaction(getVirtualHost().getMessageStore(), this::onEntryDequeue);
+ ServerTransaction txn = new AsyncAutoCommitTransaction(getVirtualHost().getMessageStore(), (future, action) -> action.postCommit());
dequeueEntry(node, txn);
}
- private void onEntryDequeue(final ListenableFuture<Void> future,
- final ServerTransaction.Action action)
- {
- Futures.addCallback(future,
- new FutureCallback<Void>()
- {
- @Override
- public void onSuccess(
- final Void result)
- {
- executeTask(String.format("Dequeue-PostCommit:%s", getName()), action::postCommit);
- }
-
- @Override
- public void onFailure(
- final Throwable t)
- {
- executeTask(String.format("Dequeue-OnRollback:%s", getName()), action::onRollback);
- }
- }, MoreExecutors.directExecutor());
-
- }
-
- private void executeTask(final String name, final Runnable runnable)
- {
- getVirtualHost().executeTask(name, runnable, getSystemTaskControllerContext(name, _virtualHost.getPrincipal()));
- }
-
private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
{
txn.dequeue(node.getEnqueueRecord(),
@@ -3863,7 +3835,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (getState() == State.ACTIVE)
{
String taskName = String.format("Queue Housekeeping : %s : TTL Update", getName());
- executeTask(taskName, this::updateQueueEntryExpiration);
+ getVirtualHost().executeTask(taskName,
+ this::updateQueueEntryExpiration,
+ getSystemTaskControllerContext(taskName, _virtualHost.getPrincipal()));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org