You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/07/19 14:45:08 UTC
qpid-broker-j git commit: QPID-7866 : [Java Broker] AMQP 1.0
publishing links do not register themselves with queues
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 83eb7addd -> f2bce8185
QPID-7866 : [Java Broker] AMQP 1.0 publishing links do not register themselves with queues
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f2bce818
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f2bce818
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f2bce818
Branch: refs/heads/master
Commit: f2bce81856864b0c4833f3a538f5e5d729d58184
Parents: 83eb7ad
Author: rgodfrey <rg...@apache.org>
Authored: Wed Jul 19 16:44:55 2017 +0200
Committer: rgodfrey <rg...@apache.org>
Committed: Wed Jul 19 16:45:01 2017 +0200
----------------------------------------------------------------------
.../apache/qpid/server/queue/AbstractQueue.java | 10 ++-
.../v1_0/StandardReceivingLinkEndpoint.java | 70 +++++++++++++++++++-
2 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f2bce818/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 21d24f2..5f9bd40 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -3512,7 +3512,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
_linkedSenders.put(sender, oldValue+1);
}
- _bindingCount++;
+ if(Binding.TYPE.equals(link.getType()))
+ {
+ _bindingCount++;
+ }
}
@Override
@@ -3523,7 +3526,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
_linkedSenders.put(sender, oldValue-1);
}
- _bindingCount--;
+ if(Binding.TYPE.equals(link.getType()))
+ {
+ _bindingCount--;
+ }
}
private void validateOrCreateAlternateBinding(final Queue<?> queue, final boolean mayCreate)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f2bce818/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index e08399c..0a5915a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,8 +32,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.MessageFormatRegistry;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -60,9 +65,49 @@ import org.apache.qpid.server.txn.ServerTransaction;
public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Target>
{
private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
+ private static final String LINK = "link";
private ReceivingDestination _receivingDestination;
+ private final PublishingLink _publishingLink = new PublishingLink()
+ {
+ @Override
+ public String getName()
+ {
+ return getLinkName();
+ }
+
+ @Override
+ public String getType()
+ {
+ return LINK;
+ }
+
+ @Override
+ public String getDestination()
+ {
+ final ReceivingDestination receivingDestination = _receivingDestination;
+ return receivingDestination == null ? "" : _receivingDestination.getAddress();
+ }
+ };
+
+ private final MessageSender _messageSender = new MessageSender()
+ {
+ @Override
+ public void destinationRemoved(final MessageDestination destination)
+ {
+ // TODO - we should probably schedule a link closure here!
+ }
+
+ @Override
+ public Collection<? extends PublishingLink> getPublishingLinks(final MessageDestination destination)
+ {
+ final ReceivingDestination receivingDestination = _receivingDestination;
+ MessageDestination actualDestination = receivingDestination == null ? null : receivingDestination.getMessageDestination();
+ return actualDestination != null && actualDestination.equals(destination) ? Collections.singleton(_publishingLink) : Collections.emptyList();
+ }
+ };
+
public StandardReceivingLinkEndpoint(final Session_1_0 session,
final Link_1_0<Source, Target> link)
{
@@ -374,7 +419,30 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
public void setDestination(final ReceivingDestination receivingDestination)
{
- _receivingDestination = receivingDestination;
+ if(_receivingDestination != receivingDestination)
+ {
+ if (_receivingDestination != null && _receivingDestination.getMessageDestination() != null)
+ {
+ _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink);
+ }
+ _receivingDestination = receivingDestination;
+ if(receivingDestination != null && receivingDestination.getMessageDestination() != null)
+ {
+ receivingDestination.getMessageDestination().linkAdded(_messageSender, _publishingLink);
+ }
+
+ }
+ }
+
+ @Override
+ public void destroy()
+ {
+ super.destroy();
+ if(_receivingDestination != null && _receivingDestination.getMessageDestination() != null)
+ {
+ _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink);
+ _receivingDestination = null;
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org