You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/07/19 14:45:08 UTC

qpid-broker-j git commit: QPID-7866 : [Java Broker] AMQP 1.0 publishing links do not register themselves with queues

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 83eb7addd -> f2bce8185


QPID-7866 : [Java Broker] AMQP 1.0 publishing links do not register themselves with queues


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f2bce818
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f2bce818
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f2bce818

Branch: refs/heads/master
Commit: f2bce81856864b0c4833f3a538f5e5d729d58184
Parents: 83eb7ad
Author: rgodfrey <rg...@apache.org>
Authored: Wed Jul 19 16:44:55 2017 +0200
Committer: rgodfrey <rg...@apache.org>
Committed: Wed Jul 19 16:45:01 2017 +0200

----------------------------------------------------------------------
 .../apache/qpid/server/queue/AbstractQueue.java | 10 ++-
 .../v1_0/StandardReceivingLinkEndpoint.java     | 70 +++++++++++++++++++-
 2 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f2bce818/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 21d24f2..5f9bd40 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -3512,7 +3512,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         {
             _linkedSenders.put(sender, oldValue+1);
         }
-        _bindingCount++;
+        if(Binding.TYPE.equals(link.getType()))
+        {
+            _bindingCount++;
+        }
     }
 
     @Override
@@ -3523,7 +3526,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         {
             _linkedSenders.put(sender, oldValue-1);
         }
-        _bindingCount--;
+        if(Binding.TYPE.equals(link.getType()))
+        {
+            _bindingCount--;
+        }
     }
 
     private void validateOrCreateAlternateBinding(final Queue<?> queue, final boolean mayCreate)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f2bce818/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index e08399c..0a5915a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.v1_0;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,8 +32,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -60,9 +65,49 @@ import org.apache.qpid.server.txn.ServerTransaction;
 public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Target>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
+    private static final String LINK = "link";
 
     private ReceivingDestination _receivingDestination;
 
+    private final PublishingLink _publishingLink = new PublishingLink()
+    {
+        @Override
+        public String getName()
+        {
+            return getLinkName();
+        }
+
+        @Override
+        public String getType()
+        {
+            return LINK;
+        }
+
+        @Override
+        public String getDestination()
+        {
+            final ReceivingDestination receivingDestination = _receivingDestination;
+            return receivingDestination == null ? "" : _receivingDestination.getAddress();
+        }
+    };
+
+    private final MessageSender _messageSender = new MessageSender()
+    {
+        @Override
+        public void destinationRemoved(final MessageDestination destination)
+        {
+            // TODO - we should probably schedule a link closure here!
+        }
+
+        @Override
+        public Collection<? extends PublishingLink> getPublishingLinks(final MessageDestination destination)
+        {
+            final ReceivingDestination receivingDestination = _receivingDestination;
+            MessageDestination actualDestination = receivingDestination == null ? null : receivingDestination.getMessageDestination();
+            return actualDestination != null && actualDestination.equals(destination) ? Collections.singleton(_publishingLink) : Collections.emptyList();
+        }
+    };
+
     public StandardReceivingLinkEndpoint(final Session_1_0 session,
                                          final Link_1_0<Source, Target> link)
     {
@@ -374,7 +419,30 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
     public void setDestination(final ReceivingDestination receivingDestination)
     {
-        _receivingDestination = receivingDestination;
+        if(_receivingDestination != receivingDestination)
+        {
+            if (_receivingDestination != null && _receivingDestination.getMessageDestination() != null)
+            {
+                _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink);
+            }
+            _receivingDestination = receivingDestination;
+            if(receivingDestination != null && receivingDestination.getMessageDestination() != null)
+            {
+                receivingDestination.getMessageDestination().linkAdded(_messageSender, _publishingLink);
+            }
+
+        }
+    }
+
+    @Override
+    public void destroy()
+    {
+        super.destroy();
+        if(_receivingDestination != null && _receivingDestination.getMessageDestination() != null)
+        {
+            _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink);
+            _receivingDestination = null;
+        }
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org