You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/05/18 17:23:44 UTC
qpid-broker-j git commit: QPID-7775: [Java Broker] [Flow to disk]
Ensure that a newly enqueued message that is flowed to disk does not
immediately have meta-data reloaded
Repository: qpid-broker-j
Updated Branches:
refs/heads/master f0772ebfe -> 8ae1d142b
QPID-7775: [Java Broker] [Flow to disk] Ensure that a newly enqueued message that is flowed to disk does not immediately have meta-data reloaded
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8ae1d142
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8ae1d142
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8ae1d142
Branch: refs/heads/master
Commit: 8ae1d142b33edc91d4988c9f4b775026bb03acc4
Parents: f0772eb
Author: Keith Wall <ke...@gmail.com>
Authored: Thu May 18 18:17:38 2017 +0100
Committer: Keith Wall <ke...@gmail.com>
Committed: Thu May 18 18:22:26 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/protocol/v0_10/ServerSession.java | 4 +++-
.../server/protocol/v1_0/StandardReceivingLinkEndpoint.java | 5 +++--
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8ae1d142/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 805b54b..02b6a61 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -960,10 +960,12 @@ public class ServerSession extends SessionInvoker
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
+ // locally cache arrival time to ensure that we don't reload metadata
+ final long arrivalTime = message.getArrivalTime();
final RoutingResult<MessageTransferMessage> result =
exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
int enqueues = result.send(_transaction, null);
- getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
+ getAMQPConnection().registerMessageReceived(message.getSize(), arrivalTime);
incrementOutstandingTxnsIfNecessary();
return enqueues;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8ae1d142/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 351f0b6..86771ce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -237,7 +237,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
try
{
Session_1_0 session = getSession();
-
+ // locally cache arrival time to ensure that we don't reload metadata
+ final long arrivalTime = serverMessage.getArrivalTime();
session.getAMQPConnection()
.checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
getReceivingDestination().authorizePublish(session.getSecurityToken(), routingAddress);
@@ -287,7 +288,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
updateDisposition(deliveryTag, resultantState, settled);
getSession().getAMQPConnection()
- .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
+ .registerMessageReceived(serverMessage.getSize(), arrivalTime);
if (!(transaction instanceof AutoCommitTransaction))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org