You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/08/15 12:56:01 UTC

qpid-broker-j git commit: QPID-7889: [Java Broker] [AMQP 0-91] On basic.nack, release the message for delivery elsewhere

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 2ca0d1676 -> 0406c8734


QPID-7889: [Java Broker] [AMQP 0-91] On basic.nack, release the message for delivery elsewhere


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0406c873
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0406c873
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0406c873

Branch: refs/heads/master
Commit: 0406c87342ddf296ff9d4b4697923c1d397b9ed3
Parents: 2ca0d16
Author: Keith Wall <kw...@apache.org>
Authored: Tue Aug 15 13:53:09 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Aug 15 13:55:39 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/message/MessageInstance.java    |  2 +-
 .../qpid/server/queue/QueueEntryImpl.java       | 28 +++++++++-----------
 .../AbstractSystemMessageSource.java            |  2 +-
 .../qpid/server/queue/MockMessageInstance.java  |  2 +-
 .../server/queue/QueueEntryImplTestBase.java    |  4 +--
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  3 ++-
 .../server/management/amqp/ManagementNode.java  |  2 +-
 .../management/amqp/ManagementResponse.java     |  2 +-
 8 files changed, 21 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 081c96d..45e5807 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -57,7 +57,7 @@ public interface MessageInstance
 
     boolean isRedelivered();
 
-    void reject();
+    void reject(final MessageInstanceConsumer<?> consumer);
 
     boolean isRejectedBy(MessageInstanceConsumer consumer);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index c3975c8..5b0cb85 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -39,8 +40,6 @@ import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.AlternateBinding;
-import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -58,7 +57,9 @@ public abstract class QueueEntryImpl implements QueueEntry
 
     private final MessageReference _message;
 
-    private Set<Object> _rejectedBy = null;
+    private volatile Set<Object> _rejectedBy = null;
+    private static final AtomicReferenceFieldUpdater<QueueEntryImpl, Set> _rejectedByUpdater =
+            AtomicReferenceFieldUpdater.newUpdater(QueueEntryImpl.class, Set.class, "_rejectedBy");
 
     private static final EntryState HELD_STATE = new EntryState()
     {
@@ -493,23 +494,18 @@ public abstract class QueueEntryImpl implements QueueEntry
     }
 
     @Override
-    public void reject()
+    public void reject(final MessageInstanceConsumer<?> consumer)
     {
-        QueueConsumer<?,?> consumer = getAcquiringConsumer();
-
-        if (consumer != null)
+        if (consumer == null)
         {
-            if (_rejectedBy == null)
-            {
-                _rejectedBy = new HashSet<>();
-            }
-
-            _rejectedBy.add(consumer.getIdentifier());
+            throw new IllegalArgumentException("consumer must not be null");
         }
-        else
+
+        if (_rejectedBy == null)
         {
-            _log.warn("Requesting rejection by null subscriber:" + this);
+            _rejectedByUpdater.compareAndSet(this, null, Collections.newSetFromMap(new ConcurrentHashMap<>()));
         }
+        _rejectedBy.add(consumer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index b77e2a4..e9b53be 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -296,7 +296,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         }
 
         @Override
-        public void reject()
+        public void reject(final MessageInstanceConsumer<?> consumer)
         {
             delete();
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 58d37fa..57c1d12 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -146,7 +146,7 @@ public class MockMessageInstance implements MessageInstance
     }
 
     @Override
-    public void reject()
+    public void reject(final MessageInstanceConsumer<?> consumer)
     {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index a2580db..35c8c46 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -261,7 +261,7 @@ public abstract class QueueEntryImplTestBase extends QpidTestCase
 
         //acquire, reject, and release the message using the consumer
         assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
-        _queueEntry.reject();
+        _queueEntry.reject(sub);
         _queueEntry.release();
 
         //verify the rejection is recorded
@@ -272,7 +272,7 @@ public abstract class QueueEntryImplTestBase extends QpidTestCase
 
         assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
         assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
-        _queueEntry.reject();
+        _queueEntry.reject(sub2);
 
         //verify it still records being rejected by both consumers
         assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 922040e..84c052b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2332,7 +2332,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                     }
                     else
                     {
-                        message.reject();
+                        message.reject(unackedMessageConsumerAssociation.getConsumer());
 
                         final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag);
                         if (_logger.isDebugEnabled())
@@ -2359,6 +2359,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                             else
                             {
                                 message.incrementDeliveryCount();
+                                message.release(unackedMessageConsumerAssociation.getConsumer());
                             }
                         }
                         else

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 6af70b0..0c2b456 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1684,7 +1684,7 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
         }
 
         @Override
-        public void reject()
+        public void reject(final MessageInstanceConsumer<?> consumer)
         {
 
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index bc1bd81..9d06ed0 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -125,7 +125,7 @@ class ManagementResponse implements MessageInstance
     }
 
     @Override
-    public void reject()
+    public void reject(final MessageInstanceConsumer<?> consumer)
     {
         delete();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org