You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/16 13:17:24 UTC
[3/3] qpid-broker-j git commit: QPID-8076: [Broker-J] [AMQP 0-9-1]
[Publisher Confirms] Delay sending publish confirms until underlying store
transaction completes.
QPID-8076: [Broker-J] [AMQP 0-9-1] [Publisher Confirms] Delay sending publish confirms until underlying store transaction completes.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f03f718f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f03f718f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f03f718f
Branch: refs/heads/master
Commit: f03f718f98545f15859197f9a1f78f76a985e0a3
Parents: 17d4b56
Author: Keith Wall <kw...@apache.org>
Authored: Mon Jan 15 16:31:07 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jan 16 13:13:47 2018 +0000
----------------------------------------------------------------------
.../qpid/server/protocol/v0_8/AMQChannel.java | 27 +++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f03f718f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index a12b483..2c4e88a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -45,6 +45,7 @@ import javax.security.auth.Subject;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -482,9 +483,29 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
{
if (_confirmOnPublish)
{
- BasicAckBody responseBody = _connection.getMethodRegistry()
- .createBasicAckBody(_confirmedMessageCounter, false);
- _connection.writeFrame(responseBody.generateFrame(_channelId));
+ recordFuture(Futures.immediateFuture(null),
+ new ServerTransaction.Action()
+ {
+ private final long _deliveryTag = _confirmedMessageCounter;
+
+ @Override
+ public void postCommit()
+ {
+ BasicAckBody body = _connection.getMethodRegistry()
+ .createBasicAckBody(
+ _deliveryTag, false);
+ _connection.writeFrame(body.generateFrame(_channelId));
+ }
+
+ @Override
+ public void onRollback()
+ {
+ final BasicNackBody body = new BasicNackBody(_deliveryTag,
+ false,
+ false);
+ _connection.writeFrame(new AMQFrame(_channelId, body));
+ }
+ });
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org