You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/16 13:17:24 UTC

[3/3] qpid-broker-j git commit: QPID-8076: [Broker-J] [AMQP 0-9-1] [Publisher Confirms] Delay sending publish confirms until underlying store transaction completes.

QPID-8076: [Broker-J] [AMQP 0-9-1] [Publisher Confirms] Delay sending publish confirms until underlying store transaction completes.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f03f718f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f03f718f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f03f718f

Branch: refs/heads/master
Commit: f03f718f98545f15859197f9a1f78f76a985e0a3
Parents: 17d4b56
Author: Keith Wall <kw...@apache.org>
Authored: Mon Jan 15 16:31:07 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jan 16 13:13:47 2018 +0000

----------------------------------------------------------------------
 .../qpid/server/protocol/v0_8/AMQChannel.java   | 27 +++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f03f718f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index a12b483..2c4e88a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -45,6 +45,7 @@ import javax.security.auth.Subject;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -482,9 +483,29 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                         {
                             if (_confirmOnPublish)
                             {
-                                BasicAckBody responseBody = _connection.getMethodRegistry()
-                                                                       .createBasicAckBody(_confirmedMessageCounter, false);
-                                _connection.writeFrame(responseBody.generateFrame(_channelId));
+                                recordFuture(Futures.immediateFuture(null),
+                                             new ServerTransaction.Action()
+                                             {
+                                                 private final long _deliveryTag = _confirmedMessageCounter;
+
+                                                 @Override
+                                                 public void postCommit()
+                                                 {
+                                                     BasicAckBody body = _connection.getMethodRegistry()
+                                                                                    .createBasicAckBody(
+                                                                                            _deliveryTag, false);
+                                                     _connection.writeFrame(body.generateFrame(_channelId));
+                                                 }
+
+                                                 @Override
+                                                 public void onRollback()
+                                                 {
+                                                     final BasicNackBody body = new BasicNackBody(_deliveryTag,
+                                                                                                  false,
+                                                                                                  false);
+                                                     _connection.writeFrame(new AMQFrame(_channelId, body));
+                                                 }
+                                             });
                             }
                         }
                     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org