You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/04 12:24:58 UTC

svn commit: r1772527 [2/2] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/filter/ broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ broker-core/src/mai...

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Sun Dec  4 12:24:57 2016
@@ -80,7 +80,7 @@ abstract class AbstractQueueTestBase ext
     private String _routingKey = "routing key";
     private DirectExchangeImpl _exchange;
     private TestConsumerTarget _consumerTarget = new TestConsumerTarget();
-    private QueueConsumer<?> _consumer;
+    private QueueConsumer<?,?> _consumer;
     private Map<String,Object> _arguments = Collections.emptyMap();
 
     @Override
@@ -171,7 +171,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageA = createMessage(new Long(24));
 
         // Check adding a consumer adds it to the queue
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
         assertEquals("Queue does not have consumer", 1,
@@ -203,7 +203,7 @@ abstract class AbstractQueueTestBase ext
     {
         ServerMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA, null, null);
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
@@ -221,7 +221,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageB = createMessage(new Long(25));
         _queue.enqueue(messageA, null, null);
         _queue.enqueue(messageB, null, null);
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
@@ -244,7 +244,7 @@ abstract class AbstractQueueTestBase ext
         AMQMessageHeader messageHeader = messageA.getMessageHeader();
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
         _queue.enqueue(messageA, null, null);
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
@@ -270,7 +270,7 @@ abstract class AbstractQueueTestBase ext
         AMQMessageHeader messageHeader = messageA.getMessageHeader();
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
         _queue.enqueue(messageA, null, null);
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
@@ -296,7 +296,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageB = createMessage(new Long(25));
         _queue.enqueue(messageB, null, null);
 
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
@@ -324,7 +324,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageC = createMessage(new Long(26));
 
 
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
 
@@ -394,7 +394,7 @@ abstract class AbstractQueueTestBase ext
             }
         };
 
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.SEES_REQUEUES,
                                                                      ConsumerOption.ACQUIRES), 0);
 
@@ -465,7 +465,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageB = createMessage(new Long(25));
         ServerMessage messageC = createMessage(new Long(26));
 
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
 
@@ -562,7 +562,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageA = createMessage(new Long(24));
         // Check adding an exclusive consumer adds it to the queue
 
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.EXCLUSIVE, ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
 
@@ -599,14 +599,14 @@ abstract class AbstractQueueTestBase ext
         // Check we cannot add an exclusive subscriber to a queue with an
         // existing consumer
         _consumer.close();
-        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+        _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                                           EnumSet.of(ConsumerOption.ACQUIRES,
                                                                      ConsumerOption.SEES_REQUEUES), 0);
 
         try
         {
 
-            _consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
+            _consumer = (QueueConsumer<?,?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
                                                               EnumSet.of(ConsumerOption.EXCLUSIVE), 0);
 
         }

Modified: qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java (original)
+++ qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java Sun Dec  4 12:24:57 2016
@@ -245,7 +245,7 @@ class LegacyAccessControlAdapter
         }
         else if (configuredObject instanceof QueueConsumer)
         {
-            Queue<?> queue = (Queue<?>)((QueueConsumer)configuredObject).getParent(Queue.class);
+            Queue<?> queue = (Queue<?>)((QueueConsumer<?,?>)configuredObject).getParent(Queue.class);
             setQueueProperties(queue, properties);
         }
         else if (isBrokerType(configuredObjectType))

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sun Dec  4 12:24:57 2016
@@ -68,8 +68,8 @@ public class AMQPConnection_0_10 extends
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
     private ServerDisassembler _disassembler;
 
-    private final Set<AMQSessionModel<?>> _sessionsWithWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
 
 
     public AMQPConnection_0_10(final Broker<?> broker,
@@ -275,7 +275,7 @@ public class AMQPConnection_0_10 extends
     }
 
     @Override
-    public void notifyWork(final AMQSessionModel<?> sessionModel)
+    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
     {
         _sessionsWithWork.add(sessionModel);
         notifyWork();
@@ -306,7 +306,7 @@ public class AMQPConnection_0_10 extends
     }
 
     @Override
-    public void closeSessionAsync(final AMQSessionModel<?> session,
+    public void closeSessionAsync(final AMQSessionModel<?,?> session,
                                   final CloseReason reason, final String message)
     {
         _connection.closeSessionAsync((ServerSession) session, reason, message);
@@ -329,7 +329,7 @@ public class AMQPConnection_0_10 extends
         return _connection.getRemoteContainerName();
     }
 
-    public Collection<? extends AMQSessionModel<?>> getSessionModels()
+    public Collection<? extends ServerSession> getSessionModels()
     {
         return _connection.getSessionModels();
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sun Dec  4 12:24:57 2016
@@ -61,7 +61,7 @@ import org.apache.qpid.transport.Option;
 import org.apache.qpid.util.ByteBufferUtils;
 import org.apache.qpid.util.GZIPUtils;
 
-public class ConsumerTarget_0_10 extends AbstractConsumerTarget
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0_10>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_0_10.class);
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sun Dec  4 12:24:57 2016
@@ -495,16 +495,16 @@ public class ServerConnection extends Co
         }
     }
 
-    public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
+    public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?,?>> sessionsWithWork)
     {
         return new ProcessPendingIterator(sessionsWithWork);
     }
 
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
-        private final Collection<AMQSessionModel<?>> _sessionsWithPending;
-        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
-        private ProcessPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
+        private final Collection<AMQSessionModel<?,?>> _sessionsWithPending;
+        private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
+        private ProcessPendingIterator(final Set<AMQSessionModel<?,?>> sessionsWithWork)
         {
             _sessionsWithPending = sessionsWithWork;
             _sessionIterator = _sessionsWithPending.iterator();
@@ -556,7 +556,7 @@ public class ServerConnection extends Co
                     {
                         _sessionIterator = _sessionsWithPending.iterator();
                     }
-                    final AMQSessionModel<?> session = _sessionIterator.next();
+                    final AMQSessionModel<?,?> session = _sessionIterator.next();
                     return new Runnable()
                     {
                         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Dec  4 12:24:57 2016
@@ -117,7 +117,7 @@ import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.network.Ticker;
 
 public class ServerSession extends Session
-        implements AMQSessionModel<ServerSession>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+        implements AMQSessionModel<ServerSession, ConsumerTarget_0_10>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
                    Deletable<ServerSession>
 
 {
@@ -175,7 +175,7 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCount = new AtomicLong(0);
 
     private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
-    private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+    private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
 
     private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
 
@@ -578,11 +578,11 @@ public class ServerSession extends Sessi
     }
 
 
-    public void register(final MessageInstanceConsumer messageInstanceConsumer)
+    public void register(final MessageInstanceConsumer<ConsumerTarget_0_10> messageInstanceConsumer)
     {
-        if(messageInstanceConsumer instanceof Consumer<?>)
+        if(messageInstanceConsumer instanceof Consumer<?,?>)
         {
-            final Consumer<?> consumer = (Consumer<?>) messageInstanceConsumer;
+            final Consumer<?,ConsumerTarget_0_10> consumer = (Consumer<?,ConsumerTarget_0_10>) messageInstanceConsumer;
             _consumers.add(consumer);
             consumer.addChangeListener(_consumerClosedListener);
             consumerAdded(consumer);
@@ -1098,7 +1098,7 @@ public class ServerSession extends Sessi
     }
 
     @Override
-    public Collection<Consumer<?>> getConsumers()
+    public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
     {
 
         return Collections.unmodifiableCollection(_consumers);
@@ -1156,7 +1156,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    private void consumerAdded(Consumer<?> consumer)
+    private void consumerAdded(Consumer<?, ConsumerTarget_0_10> consumer)
     {
         for(ConsumerListener l : _consumerListeners)
         {
@@ -1164,7 +1164,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    private void consumerRemoved(Consumer<?> consumer)
+    private void consumerRemoved(Consumer<?, ConsumerTarget_0_10> consumer)
     {
         for(ConsumerListener l : _consumerListeners)
         {
@@ -1232,9 +1232,9 @@ public class ServerSession extends Sessi
     }
 
     @Override
-    public void notifyWork(final ConsumerTarget target)
+    public void notifyWork(final ConsumerTarget_0_10 target)
     {
-        if(_consumersWithPendingWork.add((ConsumerTarget_0_10) target))
+        if(_consumersWithPendingWork.add(target))
         {
             getAMQPConnection().notifyWork(this);
         }
@@ -1284,7 +1284,7 @@ public class ServerSession extends Sessi
         {
             if(newState == org.apache.qpid.server.model.State.DELETED)
             {
-                consumerRemoved((Consumer<?>)object);
+                consumerRemoved((Consumer<?, ConsumerTarget_0_10>)object);
             }
         }
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Dec  4 12:24:57 2016
@@ -107,7 +107,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.transport.network.Ticker;
 
 public class AMQChannel
-        implements AMQSessionModel<AMQChannel>,
+        implements AMQSessionModel<AMQChannel, ConsumerTarget_0_8>,
                    AsyncAutoCommitTransaction.FutureRecorder,
                    ServerChannelMethodProcessor,
                    EventLoggerProvider, CreditRestorer
@@ -206,7 +206,7 @@ public class AMQChannel
     private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
     private final ImmediateAction _immediateAction = new ImmediateAction();
     private final Subject _subject;
-    private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+    private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_8>> _consumers = new CopyOnWriteArrayList<>();
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
@@ -323,7 +323,7 @@ public class AMQChannel
                                                              INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
         }
 
-        MessageInstanceConsumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+        MessageInstanceConsumer<ConsumerTarget_0_8> sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
         target.updateNotifyWorkDesired();
         target.sendNextMessage();
         target.close();
@@ -675,11 +675,6 @@ public class AMQChannel
     }
 
 
-    public ConsumerTarget getSubscription(AMQShortString tag)
-    {
-        return _tag2SubscriptionTargetMap.get(tag);
-    }
-
     /**
      * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
      * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
@@ -818,15 +813,15 @@ public class AMQChannel
 
             for(MessageSource source : sources)
             {
-                MessageInstanceConsumer sub =
+                MessageInstanceConsumer<ConsumerTarget_0_8> sub =
                         source.addConsumer(target,
                                            filterManager,
                                            AMQMessage.class,
                                            AMQShortString.toString(tag),
                                            options, priority);
-                if (sub instanceof Consumer<?>)
+                if (sub instanceof Consumer<?, ?>)
                 {
-                    final Consumer<?> modelConsumer = (Consumer<?>) sub;
+                    final Consumer<?,ConsumerTarget_0_8> modelConsumer = (Consumer<?,ConsumerTarget_0_8>) sub;
                     consumerAdded(modelConsumer);
                     modelConsumer.addChangeListener(_consumerClosedListener);
                     _consumers.add(modelConsumer);
@@ -865,7 +860,7 @@ public class AMQChannel
         {
             for(MessageInstanceConsumer sub : subs)
             {
-                if (sub instanceof Consumer<?>)
+                if (sub instanceof Consumer<?,?>)
                 {
                     _consumers.remove(sub);
                 }
@@ -1907,7 +1902,7 @@ public class AMQChannel
     }
 
     @Override
-    public Collection<Consumer<?>> getConsumers()
+    public Collection<Consumer<?,ConsumerTarget_0_8>> getConsumers()
     {
         return Collections.unmodifiableCollection(_consumers);
     }
@@ -1919,12 +1914,12 @@ public class AMQChannel
         {
             if(newState == State.DELETED)
             {
-                consumerRemoved((Consumer<?>)object);
+                consumerRemoved((Consumer<?,?>)object);
             }
         }
     }
 
-    private void consumerAdded(final Consumer<?> consumer)
+    private void consumerAdded(final Consumer<?,?> consumer)
     {
         for(ConsumerListener l : _consumerListeners)
         {
@@ -1932,7 +1927,7 @@ public class AMQChannel
         }
     }
 
-    private void consumerRemoved(final Consumer<?> consumer)
+    private void consumerRemoved(final Consumer<?,?> consumer)
     {
         for(ConsumerListener l : _consumerListeners)
         {
@@ -3771,9 +3766,9 @@ public class AMQChannel
     }
 
     @Override
-    public void notifyWork(final ConsumerTarget target)
+    public void notifyWork(final ConsumerTarget_0_8 target)
     {
-        if(_consumersWithPendingWork.add((ConsumerTarget_0_8) target))
+        if(_consumersWithPendingWork.add(target))
         {
             getAMQPConnection().notifyWork(this);
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Sun Dec  4 12:24:57 2016
@@ -158,8 +158,8 @@ public class AMQPConnection_0_8Impl
     private volatile boolean _transportBlockedForWriting;
     private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
 
-    private final Set<AMQSessionModel<?>> _sessionsWithWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
 
 
     public AMQPConnection_0_8Impl(Broker<?> broker,
@@ -741,7 +741,7 @@ public class AMQPConnection_0_8Impl
         return String.valueOf(getNetwork().getRemoteAddress());
     }
 
-    public void closeSessionAsync(final AMQSessionModel<?> session, final CloseReason reason, final String message)
+    public void closeSessionAsync(final AMQSessionModel<?,?> session, final CloseReason reason, final String message)
     {
         final int cause;
         switch (reason)
@@ -1355,7 +1355,7 @@ public class AMQPConnection_0_8Impl
     }
 
     @Override
-    public void notifyWork(final AMQSessionModel<?> sessionModel)
+    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
     {
         _sessionsWithWork.add(sessionModel);
         notifyWork();
@@ -1385,7 +1385,7 @@ public class AMQPConnection_0_8Impl
 
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
-        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+        private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
 
         private ProcessPendingIterator()
         {
@@ -1437,7 +1437,7 @@ public class AMQPConnection_0_8Impl
                     {
                         _sessionIterator = _sessionsWithWork.iterator();
                     }
-                    final AMQSessionModel<?> session = _sessionIterator.next();
+                    final AMQSessionModel<?,?> session = _sessionIterator.next();
                     return new Runnable()
                     {
                         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sun Dec  4 12:24:57 2016
@@ -44,7 +44,7 @@ import org.apache.qpid.server.util.State
  * Ties together the protocol session of a subscriber, the consumer tag
  * that was given out by the broker and the channel id.
  */
-public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget
+public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<ConsumerTarget_0_8>
 {
 
     private final ClientDeliveryMethod _deliveryMethod;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Sun Dec  4 12:24:57 2016
@@ -236,8 +236,8 @@ public class AMQPConnection_1_0 extends
 
     private boolean _closedOnOpen;
 
-    private final Set<AMQSessionModel<?>> _sessionsWithWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
 
     AMQPConnection_1_0(final Broker<?> broker,
                        final ServerNetworkConnection network,
@@ -1357,7 +1357,7 @@ public class AMQPConnection_1_0 extends
     }
 
     @Override
-    public void notifyWork(final AMQSessionModel<?> sessionModel)
+    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
     {
         _sessionsWithWork.add(sessionModel);
         notifyWork();
@@ -1409,7 +1409,7 @@ public class AMQPConnection_1_0 extends
         addAsyncTask(action);
     }
 
-    public void closeSessionAsync(final AMQSessionModel<?> session,
+    public void closeSessionAsync(final AMQSessionModel<?,?> session,
                                   final CloseReason reason, final String message)
     {
         final ErrorCondition cause;
@@ -1603,7 +1603,7 @@ public class AMQPConnection_1_0 extends
 
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
-        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+        private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
         private ProcessPendingIterator()
         {
             _sessionIterator = _sessionsWithWork.iterator();
@@ -1653,7 +1653,7 @@ public class AMQPConnection_1_0 extends
                     {
                         _sessionIterator = _sessionsWithWork.iterator();
                     }
-                    final AMQSessionModel<?> session = _sessionIterator.next();
+                    final AMQSessionModel<?,?> session = _sessionIterator.next();
                     return new Runnable()
                     {
                         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Sun Dec  4 12:24:57 2016
@@ -55,7 +55,7 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
-class ConsumerTarget_1_0 extends AbstractConsumerTarget
+class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
     private final boolean _acquires;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sun Dec  4 12:24:57 2016
@@ -109,7 +109,7 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.network.Ticker;
 
-public class Session_1_0 implements AMQSessionModel<Session_1_0>, LogSubject
+public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
 {
     private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
@@ -131,7 +131,7 @@ public class Session_1_0 implements AMQS
     private AtomicBoolean _closed = new AtomicBoolean();
     private final Subject _subject = new Subject();
 
-    private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+    private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
 
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
@@ -1085,9 +1085,9 @@ public class Session_1_0 implements AMQS
     private void registerConsumer(final SendingLink_1_0 link)
     {
         MessageInstanceConsumer consumer = link.getConsumer();
-        if(consumer instanceof Consumer<?>)
+        if(consumer instanceof Consumer<?,?>)
         {
-            Consumer<?> modelConsumer = (Consumer<?>) consumer;
+            Consumer<?,ConsumerTarget_1_0> modelConsumer = (Consumer<?,ConsumerTarget_1_0>) consumer;
             _consumers.add(modelConsumer);
             modelConsumer.addChangeListener(_consumerClosedListener);
             consumerAdded(modelConsumer);
@@ -1552,7 +1552,7 @@ public class Session_1_0 implements AMQS
     }
 
     @Override
-    public Collection<Consumer<?>> getConsumers()
+    public Collection<Consumer<?, ConsumerTarget_1_0>> getConsumers()
     {
         return Collections.unmodifiableCollection(_consumers);
     }
@@ -1638,9 +1638,9 @@ public class Session_1_0 implements AMQS
     }
 
     @Override
-    public void notifyWork(final ConsumerTarget target)
+    public void notifyWork(final ConsumerTarget_1_0 target)
     {
-        if(_consumersWithPendingWork.add((ConsumerTarget_1_0) target))
+        if(_consumersWithPendingWork.add(target))
         {
             getAMQPConnection().notifyWork(this);
         }
@@ -1652,7 +1652,7 @@ public class Session_1_0 implements AMQS
         getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
     }
 
-    private void consumerAdded(Consumer<?> consumer)
+    private void consumerAdded(Consumer<?, ConsumerTarget_1_0> consumer)
     {
         for(ConsumerListener l : _consumerListeners)
         {
@@ -1660,7 +1660,7 @@ public class Session_1_0 implements AMQS
         }
     }
 
-    private void consumerRemoved(Consumer<?> consumer)
+    private void consumerRemoved(Consumer<?, ConsumerTarget_1_0> consumer)
     {
         for(ConsumerListener l : _consumerListeners)
         {
@@ -1675,7 +1675,7 @@ public class Session_1_0 implements AMQS
         {
             if(newState == org.apache.qpid.server.model.State.DELETED)
             {
-                consumerRemoved((Consumer<?>)object);
+                consumerRemoved((Consumer<?, ConsumerTarget_1_0>)object);
             }
         }
     }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Dec  4 12:24:57 2016
@@ -677,7 +677,7 @@ class ManagementNode implements MessageS
             {
                 return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+ConfiguredObject.ID+"' cannot be set when creating an object");
             }
-            if(!attributes.containsKey(QPID_TYPE) && _model.getTypeRegistry().getCategory(clazz) != clazz)
+            if(!attributes.containsKey(QPID_TYPE) && ConfiguredObjectTypeRegistry.getCategory(clazz) != clazz)
             {
                 Class<? extends ConfiguredObject> typeClass = _model.getTypeRegistry().getTypeClass(clazz);
                 String type = typeClass.getAnnotation(ManagedObject.class).type();
@@ -699,7 +699,7 @@ class ManagementNode implements MessageS
                 {
 
                     List<ConfiguredObject> parents =
-                            _configuredObjectFinder.findObjectParentsFromPath(Arrays.asList(getPathElements(path)), hierarchy, _model.getTypeRegistry().getCategory(clazz));
+                            _configuredObjectFinder.findObjectParentsFromPath(Arrays.asList(getPathElements(path)), hierarchy, ConfiguredObjectTypeRegistry.getCategory(clazz));
                     if(parents.isEmpty())
                     {
                         return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "The '"+OBJECT_PATH+"' "+path+" does not identify a valid parent");
@@ -748,7 +748,7 @@ class ManagementNode implements MessageS
                 }
 
 
-                final ConfiguredObject object = primaryParent.createChild(_model.getTypeRegistry().getCategory(clazz), attributes, otherParents);
+                final ConfiguredObject object = primaryParent.createChild(ConfiguredObjectTypeRegistry.getCategory(clazz), attributes, otherParents);
                 return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader,
                                                         _managementOutputConverter.convertToOutput(object, true));
             }
@@ -1052,8 +1052,8 @@ class ManagementNode implements MessageS
 
     private Map<?, ?> performQuery(final Map<String, Object> headerMap, final Map messageBody)
     {
-
-        List<Object> attributeNameObjects = _managementInputConverter.convert(List.class, messageBody.get(ATTRIBUTE_NAMES));
+        @SuppressWarnings("unchecked")
+        List<Object> attributeNameObjects = (List<Object>)_managementInputConverter.convert(List.class, messageBody.get(ATTRIBUTE_NAMES));
         List<String> attributeNames;
         if(attributeNameObjects == null)
         {
@@ -1213,8 +1213,10 @@ class ManagementNode implements MessageS
                 {
                     if(ancestorCategories.contains(entry.getKey()))
                     {
-                        ConfiguredObjectOperation op = entry.getValue();
-                        for(ConfiguredObject<?> parent : (Collection<ConfiguredObject<?>>) op.perform(_managedObject, Collections.<String,Object>emptyMap()))
+                        @SuppressWarnings("unchecked")
+                        final Collection<ConfiguredObject<?>> parents = getAssociatedChildren(entry.getValue(), _managedObject);
+
+                        for(ConfiguredObject<?> parent : parents)
                         {
                             foundObjects.addAll(getChildrenOfType(parent, type));
                         }
@@ -1241,8 +1243,11 @@ class ManagementNode implements MessageS
         parents.add(_managedObject);
         for(ConfiguredObjectOperation op : _associatedChildrenOperations.values())
         {
-            parents.addAll(
-                    (Collection<ConfiguredObject<?>>) op.perform(_managedObject, Collections.<String,Object>emptyMap()));
+
+            @SuppressWarnings("unchecked")
+            final Collection<ConfiguredObject<?>> associated = getAssociatedChildren(op, _managedObject);
+
+            parents.addAll(associated);
         }
         foundObjects.addAll(parents);
         do
@@ -1263,6 +1268,13 @@ class ManagementNode implements MessageS
         return foundObjects;
     }
 
+    private static <C extends ConfiguredObject<?>> Collection<ConfiguredObject<?>> getAssociatedChildren(final ConfiguredObjectOperation<C> op, final ConfiguredObject<?> managedObject)
+    {
+        @SuppressWarnings("unchecked")
+        final Collection<ConfiguredObject<?>> associated = (Collection<ConfiguredObject<?>>) op.perform((C)managedObject, Collections.<String, Object>emptyMap());
+        return associated;
+    }
+
     private List<String> generateAttributeNames(String entityType)
     {
         Set<String> attrNameSet = new HashSet<>();
@@ -1431,7 +1443,7 @@ class ManagementNode implements MessageS
 
 
     @Override
-    public synchronized  ManagementNodeConsumer addConsumer(final ConsumerTarget target,
+    public synchronized <T extends ConsumerTarget<T>> ManagementNodeConsumer<T> addConsumer(final T target,
                                                             final FilterManager filters,
                                                             final Class<? extends ServerMessage> messageClass,
                                                             final String consumerName,
@@ -1439,7 +1451,7 @@ class ManagementNode implements MessageS
                                                             final Integer priority)
     {
 
-        final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
+        final ManagementNodeConsumer<T> managementNodeConsumer = new ManagementNodeConsumer<>(consumerName,this, target);
         target.consumerAdded(managementNodeConsumer);
         _consumers.add(managementNodeConsumer);
         return managementNodeConsumer;
@@ -1452,7 +1464,7 @@ class ManagementNode implements MessageS
     }
 
     @Override
-    public boolean verifySessionAccess(final AMQSessionModel<?> session)
+    public boolean verifySessionAccess(final AMQSessionModel<?,?> session)
     {
         return true;
     }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Sun Dec  4 12:24:57 2016
@@ -41,16 +41,16 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 
-class ManagementNodeConsumer implements MessageInstanceConsumer, MessageDestination
+class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, MessageDestination
 {
     private final ManagementNode _managementNode;
     private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
-    private final ConsumerTarget _target;
+    private final T _target;
     private final String _name;
     private final Object _identifier = new Object();
 
 
-    public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
+    public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, T target)
     {
         _name = consumerName;
         _managementNode = managementNode;
@@ -151,7 +151,7 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public ConsumerTarget getTarget()
+    public T getTarget()
     {
         return _target;
     }




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org