You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/10/08 13:51:23 UTC

svn commit: r1707505 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-plugins/amqp-0-10-protoco...

Author: orudyy
Date: Thu Oct  8 11:51:23 2015
New Revision: 1707505

URL: http://svn.apache.org/viewvc?rev=1707505&view=rev
Log:
QPID-6765: [Java Broker] Make sure AbstractSystemMessageSource.Consumer is being closed on 0-10 path and remove MessageSource.ConsumerRegistrationListener

(work done by Lorenz Quack <qu...@gmail.com> and Alex Rudyy <or...@gmail.com>)

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Thu Oct  8 11:51:23 2015
@@ -39,18 +39,8 @@ public interface MessageSource extends T
 
     Collection<? extends ConsumerImpl> getConsumers();
 
-    void addConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener);
-
-    void removeConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener);
-
     boolean verifySessionAccess(AMQSessionModel<?> session);
 
-    interface ConsumerRegistrationListener<Q extends MessageSource>
-    {
-        void consumerAdded(Q source, ConsumerImpl consumer);
-        void consumerRemoved(Q queue, ConsumerImpl consumer);
-    }
-
     /**
      * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
      * already exists.

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Oct  8 11:51:23 2015
@@ -77,7 +77,6 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Binding;
@@ -248,9 +247,6 @@ public abstract class AbstractQueue<X ex
 
     private MessageGroupManager _messageGroupManager;
 
-    private final Collection<ConsumerRegistrationListener<? super MessageSource>> _consumerListeners =
-            new ArrayList<ConsumerRegistrationListener<? super MessageSource>>();
-
     private QueueNotificationListener  _notificationListener;
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
 
@@ -891,14 +887,6 @@ public abstract class AbstractQueue<X ex
 
         if (!isDeleted())
         {
-            synchronized (_consumerListeners)
-            {
-                for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
-                {
-                    listener.consumerAdded(this, consumer);
-                }
-            }
-
             _consumerList.add(consumer);
 
             if (isDeleted())
@@ -955,14 +943,6 @@ public abstract class AbstractQueue<X ex
                 resetSubPointersForGroups(consumer);
             }
 
-            synchronized (_consumerListeners)
-            {
-                for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
-                {
-                    listener.consumerRemoved(this, consumer);
-                }
-            }
-
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if(!consumer.isTransient()
@@ -1006,22 +986,6 @@ public abstract class AbstractQueue<X ex
 
     }
 
-    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
-    {
-        synchronized (_consumerListeners)
-        {
-            _consumerListeners.add(listener);
-        }
-    }
-
-    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
-    {
-        synchronized (_consumerListeners)
-        {
-            _consumerListeners.remove(listener);
-        }
-    }
-
     public void resetSubPointersForGroups(QueueConsumer<?> consumer)
     {
         QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Oct  8 11:51:23 2015
@@ -26,9 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -54,9 +52,6 @@ public abstract class AbstractSystemMess
     protected final UUID _id;
     protected final String _name;
     protected final VirtualHost<?, ?, ?> _virtualHost;
-    private final CopyOnWriteArrayList<ConsumerRegistrationListener<? super MessageSource>>
-            _consumerRegistrationListeners =
-            new CopyOnWriteArrayList<>();
     private List<Consumer> _consumers = new CopyOnWriteArrayList<>();
 
     public AbstractSystemMessageSource(
@@ -98,10 +93,6 @@ public abstract class AbstractSystemMess
         final Consumer consumer = new Consumer(consumerName, target);
         target.consumerAdded(consumer);
         _consumers.add(consumer);
-        for (ConsumerRegistrationListener<? super MessageSource> listener : _consumerRegistrationListeners)
-        {
-            listener.consumerAdded(this, consumer);
-        }
         return consumer;
     }
 
@@ -112,18 +103,6 @@ public abstract class AbstractSystemMess
     }
 
     @Override
-    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
-    {
-        _consumerRegistrationListeners.add(listener);
-    }
-
-    @Override
-    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
-    {
-        _consumerRegistrationListeners.remove(listener);
-    }
-
-    @Override
     public boolean verifySessionAccess(final AMQSessionModel<?> session)
     {
         return true;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Oct  8 11:51:23 2015
@@ -20,13 +20,9 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -139,6 +135,10 @@ public class ConsumerTarget_0_10 extends
             releaseSendLock();
         }
 
+        for (ConsumerImpl consumer : _consumers)
+        {
+            consumer.close();
+        }
         return closed;
 
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Oct  8 11:51:23 2015
@@ -831,6 +831,10 @@ public class AMQChannel
      */
     public boolean unsubscribeConsumer(AMQShortString consumerTag)
     {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Unsubscribing consumer '{}' on channel {}", consumerTag, this);
+        }
 
         ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
         Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
@@ -907,25 +911,11 @@ public class AMQChannel
             }
         }
 
-        for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
+        Set<AMQShortString> subscriptionTags = new HashSet<>(_tag2SubscriptionTargetMap.keySet());
+        for (AMQShortString tag : subscriptionTags)
         {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
-            }
-
-            Collection<ConsumerImpl> subs = me.getValue().getConsumers();
-
-            if(subs != null)
-            {
-                for(ConsumerImpl sub : subs)
-                {
-                    sub.close();
-                }
-            }
+            unsubscribeConsumer(tag);
         }
-
-        _tag2SubscriptionTargetMap.clear();
     }
 
     /**

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Oct  8 11:51:23 2015
@@ -406,7 +406,7 @@ public abstract class ConsumerTarget_0_8
         return _creditManager;
     }
 
-
+    @Override
     public boolean close()
     {
         boolean closed = false;

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Thu Oct  8 11:51:23 2015
@@ -36,7 +36,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -107,9 +106,6 @@ class ManagementNode implements MessageS
 
     private final UUID _id;
 
-    private final CopyOnWriteArrayList<ConsumerRegistrationListener<? super MessageSource>> _consumerRegistrationListeners =
-            new CopyOnWriteArrayList<ConsumerRegistrationListener<? super MessageSource>>();
-
     private final SystemNodeCreator.SystemNodeRegistry _registry;
     private final ConfiguredObject<?> _managedObject;
     private Map<String, ManagementNodeConsumer> _consumers = new ConcurrentHashMap<String, ManagementNodeConsumer>();
@@ -967,10 +963,6 @@ class ManagementNode implements MessageS
         final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
         target.consumerAdded(managementNodeConsumer);
         _consumers.put(consumerName, managementNodeConsumer);
-        for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerRegistrationListeners)
-        {
-            listener.consumerAdded(this, managementNodeConsumer);
-        }
         return managementNodeConsumer;
     }
 
@@ -981,18 +973,6 @@ class ManagementNode implements MessageS
     }
 
     @Override
-    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
-    {
-        _consumerRegistrationListeners.add(listener);
-    }
-
-    @Override
-    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
-    {
-        _consumerRegistrationListeners.remove(listener);
-    }
-
-    @Override
     public boolean verifySessionAccess(final AMQSessionModel<?> session)
     {
         return true;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org