You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/10/08 13:51:23 UTC
svn commit: r1707505 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apache/qpid/server/virtualhost/
broker-plugins/amqp-0-10-protoco...
Author: orudyy
Date: Thu Oct 8 11:51:23 2015
New Revision: 1707505
URL: http://svn.apache.org/viewvc?rev=1707505&view=rev
Log:
QPID-6765: [Java Broker] Make sure AbstractSystemMessageSource.Consumer is being closed on 0-10 path and remove MessageSource.ConsumerRegistrationListener
(work done by Lorenz Quack <qu...@gmail.com> and Alex Rudyy <or...@gmail.com>)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Thu Oct 8 11:51:23 2015
@@ -39,18 +39,8 @@ public interface MessageSource extends T
Collection<? extends ConsumerImpl> getConsumers();
- void addConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener);
-
- void removeConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener);
-
boolean verifySessionAccess(AMQSessionModel<?> session);
- interface ConsumerRegistrationListener<Q extends MessageSource>
- {
- void consumerAdded(Q source, ConsumerImpl consumer);
- void consumerRemoved(Q queue, ConsumerImpl consumer);
- }
-
/**
* ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
* already exists.
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Oct 8 11:51:23 2015
@@ -77,7 +77,6 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Binding;
@@ -248,9 +247,6 @@ public abstract class AbstractQueue<X ex
private MessageGroupManager _messageGroupManager;
- private final Collection<ConsumerRegistrationListener<? super MessageSource>> _consumerListeners =
- new ArrayList<ConsumerRegistrationListener<? super MessageSource>>();
-
private QueueNotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -891,14 +887,6 @@ public abstract class AbstractQueue<X ex
if (!isDeleted())
{
- synchronized (_consumerListeners)
- {
- for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
- {
- listener.consumerAdded(this, consumer);
- }
- }
-
_consumerList.add(consumer);
if (isDeleted())
@@ -955,14 +943,6 @@ public abstract class AbstractQueue<X ex
resetSubPointersForGroups(consumer);
}
- synchronized (_consumerListeners)
- {
- for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
- {
- listener.consumerRemoved(this, consumer);
- }
- }
-
// auto-delete queues must be deleted if there are no remaining subscribers
if(!consumer.isTransient()
@@ -1006,22 +986,6 @@ public abstract class AbstractQueue<X ex
}
- public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
- {
- synchronized (_consumerListeners)
- {
- _consumerListeners.add(listener);
- }
- }
-
- public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
- {
- synchronized (_consumerListeners)
- {
- _consumerListeners.remove(listener);
- }
- }
-
public void resetSubPointersForGroups(QueueConsumer<?> consumer)
{
QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Oct 8 11:51:23 2015
@@ -26,9 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -54,9 +52,6 @@ public abstract class AbstractSystemMess
protected final UUID _id;
protected final String _name;
protected final VirtualHost<?, ?, ?> _virtualHost;
- private final CopyOnWriteArrayList<ConsumerRegistrationListener<? super MessageSource>>
- _consumerRegistrationListeners =
- new CopyOnWriteArrayList<>();
private List<Consumer> _consumers = new CopyOnWriteArrayList<>();
public AbstractSystemMessageSource(
@@ -98,10 +93,6 @@ public abstract class AbstractSystemMess
final Consumer consumer = new Consumer(consumerName, target);
target.consumerAdded(consumer);
_consumers.add(consumer);
- for (ConsumerRegistrationListener<? super MessageSource> listener : _consumerRegistrationListeners)
- {
- listener.consumerAdded(this, consumer);
- }
return consumer;
}
@@ -112,18 +103,6 @@ public abstract class AbstractSystemMess
}
@Override
- public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
- {
- _consumerRegistrationListeners.add(listener);
- }
-
- @Override
- public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
- {
- _consumerRegistrationListeners.remove(listener);
- }
-
- @Override
public boolean verifySessionAccess(final AMQSessionModel<?> session)
{
return true;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Oct 8 11:51:23 2015
@@ -20,13 +20,9 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -139,6 +135,10 @@ public class ConsumerTarget_0_10 extends
releaseSendLock();
}
+ for (ConsumerImpl consumer : _consumers)
+ {
+ consumer.close();
+ }
return closed;
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Oct 8 11:51:23 2015
@@ -831,6 +831,10 @@ public class AMQChannel
*/
public boolean unsubscribeConsumer(AMQShortString consumerTag)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unsubscribing consumer '{}' on channel {}", consumerTag, this);
+ }
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
@@ -907,25 +911,11 @@ public class AMQChannel
}
}
- for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
+ Set<AMQShortString> subscriptionTags = new HashSet<>(_tag2SubscriptionTargetMap.keySet());
+ for (AMQShortString tag : subscriptionTags)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
- }
-
- Collection<ConsumerImpl> subs = me.getValue().getConsumers();
-
- if(subs != null)
- {
- for(ConsumerImpl sub : subs)
- {
- sub.close();
- }
- }
+ unsubscribeConsumer(tag);
}
-
- _tag2SubscriptionTargetMap.clear();
}
/**
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Oct 8 11:51:23 2015
@@ -406,7 +406,7 @@ public abstract class ConsumerTarget_0_8
return _creditManager;
}
-
+ @Override
public boolean close()
{
boolean closed = false;
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1707505&r1=1707504&r2=1707505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Thu Oct 8 11:51:23 2015
@@ -36,7 +36,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -107,9 +106,6 @@ class ManagementNode implements MessageS
private final UUID _id;
- private final CopyOnWriteArrayList<ConsumerRegistrationListener<? super MessageSource>> _consumerRegistrationListeners =
- new CopyOnWriteArrayList<ConsumerRegistrationListener<? super MessageSource>>();
-
private final SystemNodeCreator.SystemNodeRegistry _registry;
private final ConfiguredObject<?> _managedObject;
private Map<String, ManagementNodeConsumer> _consumers = new ConcurrentHashMap<String, ManagementNodeConsumer>();
@@ -967,10 +963,6 @@ class ManagementNode implements MessageS
final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
target.consumerAdded(managementNodeConsumer);
_consumers.put(consumerName, managementNodeConsumer);
- for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerRegistrationListeners)
- {
- listener.consumerAdded(this, managementNodeConsumer);
- }
return managementNodeConsumer;
}
@@ -981,18 +973,6 @@ class ManagementNode implements MessageS
}
@Override
- public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
- {
- _consumerRegistrationListeners.add(listener);
- }
-
- @Override
- public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
- {
- _consumerRegistrationListeners.remove(listener);
- }
-
- @Override
public boolean verifySessionAccess(final AMQSessionModel<?> session)
{
return true;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org