You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/08/15 12:56:01 UTC
qpid-broker-j git commit: QPID-7889: [Java Broker] [AMQP 0-91] On
basic.nack, release the message for delivery elsewhere
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 2ca0d1676 -> 0406c8734
QPID-7889: [Java Broker] [AMQP 0-91] On basic.nack, release the message for delivery elsewhere
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0406c873
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0406c873
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0406c873
Branch: refs/heads/master
Commit: 0406c87342ddf296ff9d4b4697923c1d397b9ed3
Parents: 2ca0d16
Author: Keith Wall <kw...@apache.org>
Authored: Tue Aug 15 13:53:09 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Aug 15 13:55:39 2017 +0100
----------------------------------------------------------------------
.../qpid/server/message/MessageInstance.java | 2 +-
.../qpid/server/queue/QueueEntryImpl.java | 28 +++++++++-----------
.../AbstractSystemMessageSource.java | 2 +-
.../qpid/server/queue/MockMessageInstance.java | 2 +-
.../server/queue/QueueEntryImplTestBase.java | 4 +--
.../qpid/server/protocol/v0_8/AMQChannel.java | 3 ++-
.../server/management/amqp/ManagementNode.java | 2 +-
.../management/amqp/ManagementResponse.java | 2 +-
8 files changed, 21 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 081c96d..45e5807 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -57,7 +57,7 @@ public interface MessageInstance
boolean isRedelivered();
- void reject();
+ void reject(final MessageInstanceConsumer<?> consumer);
boolean isRejectedBy(MessageInstanceConsumer consumer);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index c3975c8..5b0cb85 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.server.queue;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -39,8 +40,6 @@ import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.AlternateBinding;
-import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -58,7 +57,9 @@ public abstract class QueueEntryImpl implements QueueEntry
private final MessageReference _message;
- private Set<Object> _rejectedBy = null;
+ private volatile Set<Object> _rejectedBy = null;
+ private static final AtomicReferenceFieldUpdater<QueueEntryImpl, Set> _rejectedByUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(QueueEntryImpl.class, Set.class, "_rejectedBy");
private static final EntryState HELD_STATE = new EntryState()
{
@@ -493,23 +494,18 @@ public abstract class QueueEntryImpl implements QueueEntry
}
@Override
- public void reject()
+ public void reject(final MessageInstanceConsumer<?> consumer)
{
- QueueConsumer<?,?> consumer = getAcquiringConsumer();
-
- if (consumer != null)
+ if (consumer == null)
{
- if (_rejectedBy == null)
- {
- _rejectedBy = new HashSet<>();
- }
-
- _rejectedBy.add(consumer.getIdentifier());
+ throw new IllegalArgumentException("consumer must not be null");
}
- else
+
+ if (_rejectedBy == null)
{
- _log.warn("Requesting rejection by null subscriber:" + this);
+ _rejectedByUpdater.compareAndSet(this, null, Collections.newSetFromMap(new ConcurrentHashMap<>()));
}
+ _rejectedBy.add(consumer);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index b77e2a4..e9b53be 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -296,7 +296,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
}
@Override
- public void reject()
+ public void reject(final MessageInstanceConsumer<?> consumer)
{
delete();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 58d37fa..57c1d12 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -146,7 +146,7 @@ public class MockMessageInstance implements MessageInstance
}
@Override
- public void reject()
+ public void reject(final MessageInstanceConsumer<?> consumer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index a2580db..35c8c46 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -261,7 +261,7 @@ public abstract class QueueEntryImplTestBase extends QpidTestCase
//acquire, reject, and release the message using the consumer
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
- _queueEntry.reject();
+ _queueEntry.reject(sub);
_queueEntry.release();
//verify the rejection is recorded
@@ -272,7 +272,7 @@ public abstract class QueueEntryImplTestBase extends QpidTestCase
assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
- _queueEntry.reject();
+ _queueEntry.reject(sub2);
//verify it still records being rejected by both consumers
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 922040e..84c052b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2332,7 +2332,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
else
{
- message.reject();
+ message.reject(unackedMessageConsumerAssociation.getConsumer());
final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag);
if (_logger.isDebugEnabled())
@@ -2359,6 +2359,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
else
{
message.incrementDeliveryCount();
+ message.release(unackedMessageConsumerAssociation.getConsumer());
}
}
else
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 6af70b0..0c2b456 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1684,7 +1684,7 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
}
@Override
- public void reject()
+ public void reject(final MessageInstanceConsumer<?> consumer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0406c873/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index bc1bd81..9d06ed0 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -125,7 +125,7 @@ class ManagementResponse implements MessageInstance
}
@Override
- public void reject()
+ public void reject(final MessageInstanceConsumer<?> consumer)
{
delete();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org