You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/04 12:24:58 UTC
svn commit: r1772527 [2/2] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/filter/
broker-core/src/main/java/org/apache/qpid/server/logging/subjects/
broker-core/src/mai...
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Sun Dec 4 12:24:57 2016
@@ -80,7 +80,7 @@ abstract class AbstractQueueTestBase ext
private String _routingKey = "routing key";
private DirectExchangeImpl _exchange;
private TestConsumerTarget _consumerTarget = new TestConsumerTarget();
- private QueueConsumer<?> _consumer;
+ private QueueConsumer<?,?> _consumer;
private Map<String,Object> _arguments = Collections.emptyMap();
@Override
@@ -171,7 +171,7 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageA = createMessage(new Long(24));
// Check adding a consumer adds it to the queue
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
assertEquals("Queue does not have consumer", 1,
@@ -203,7 +203,7 @@ abstract class AbstractQueueTestBase ext
{
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA, null, null);
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
@@ -221,7 +221,7 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageA, null, null);
_queue.enqueue(messageB, null, null);
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
@@ -244,7 +244,7 @@ abstract class AbstractQueueTestBase ext
AMQMessageHeader messageHeader = messageA.getMessageHeader();
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
_queue.enqueue(messageA, null, null);
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
@@ -270,7 +270,7 @@ abstract class AbstractQueueTestBase ext
AMQMessageHeader messageHeader = messageA.getMessageHeader();
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
_queue.enqueue(messageA, null, null);
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
@@ -296,7 +296,7 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageB, null, null);
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
while(_consumerTarget.processPending());
@@ -324,7 +324,7 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageC = createMessage(new Long(26));
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
@@ -394,7 +394,7 @@ abstract class AbstractQueueTestBase ext
}
};
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.SEES_REQUEUES,
ConsumerOption.ACQUIRES), 0);
@@ -465,7 +465,7 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
@@ -562,7 +562,7 @@ abstract class AbstractQueueTestBase ext
ServerMessage messageA = createMessage(new Long(24));
// Check adding an exclusive consumer adds it to the queue
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.EXCLUSIVE, ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
@@ -599,14 +599,14 @@ abstract class AbstractQueueTestBase ext
// Check we cannot add an exclusive subscriber to a queue with an
// existing consumer
_consumer.close();
- _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
try
{
- _consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ _consumer = (QueueConsumer<?,?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.EXCLUSIVE), 0);
}
Modified: qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java (original)
+++ qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java Sun Dec 4 12:24:57 2016
@@ -245,7 +245,7 @@ class LegacyAccessControlAdapter
}
else if (configuredObject instanceof QueueConsumer)
{
- Queue<?> queue = (Queue<?>)((QueueConsumer)configuredObject).getParent(Queue.class);
+ Queue<?> queue = (Queue<?>)((QueueConsumer<?,?>)configuredObject).getParent(Queue.class);
setQueueProperties(queue, properties);
}
else if (isBrokerType(configuredObjectType))
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sun Dec 4 12:24:57 2016
@@ -68,8 +68,8 @@ public class AMQPConnection_0_10 extends
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private ServerDisassembler _disassembler;
- private final Set<AMQSessionModel<?>> _sessionsWithWork =
- Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+ private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
public AMQPConnection_0_10(final Broker<?> broker,
@@ -275,7 +275,7 @@ public class AMQPConnection_0_10 extends
}
@Override
- public void notifyWork(final AMQSessionModel<?> sessionModel)
+ public void notifyWork(final AMQSessionModel<?,?> sessionModel)
{
_sessionsWithWork.add(sessionModel);
notifyWork();
@@ -306,7 +306,7 @@ public class AMQPConnection_0_10 extends
}
@Override
- public void closeSessionAsync(final AMQSessionModel<?> session,
+ public void closeSessionAsync(final AMQSessionModel<?,?> session,
final CloseReason reason, final String message)
{
_connection.closeSessionAsync((ServerSession) session, reason, message);
@@ -329,7 +329,7 @@ public class AMQPConnection_0_10 extends
return _connection.getRemoteContainerName();
}
- public Collection<? extends AMQSessionModel<?>> getSessionModels()
+ public Collection<? extends ServerSession> getSessionModels()
{
return _connection.getSessionModels();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sun Dec 4 12:24:57 2016
@@ -61,7 +61,7 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.util.ByteBufferUtils;
import org.apache.qpid.util.GZIPUtils;
-public class ConsumerTarget_0_10 extends AbstractConsumerTarget
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0_10>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_0_10.class);
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sun Dec 4 12:24:57 2016
@@ -495,16 +495,16 @@ public class ServerConnection extends Co
}
}
- public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
+ public Iterator<Runnable> processPendingIterator(final Set<AMQSessionModel<?,?>> sessionsWithWork)
{
return new ProcessPendingIterator(sessionsWithWork);
}
private class ProcessPendingIterator implements Iterator<Runnable>
{
- private final Collection<AMQSessionModel<?>> _sessionsWithPending;
- private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
- private ProcessPendingIterator(final Set<AMQSessionModel<?>> sessionsWithWork)
+ private final Collection<AMQSessionModel<?,?>> _sessionsWithPending;
+ private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
+ private ProcessPendingIterator(final Set<AMQSessionModel<?,?>> sessionsWithWork)
{
_sessionsWithPending = sessionsWithWork;
_sessionIterator = _sessionsWithPending.iterator();
@@ -556,7 +556,7 @@ public class ServerConnection extends Co
{
_sessionIterator = _sessionsWithPending.iterator();
}
- final AMQSessionModel<?> session = _sessionIterator.next();
+ final AMQSessionModel<?,?> session = _sessionIterator.next();
return new Runnable()
{
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Dec 4 12:24:57 2016
@@ -117,7 +117,7 @@ import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.network.Ticker;
public class ServerSession extends Session
- implements AMQSessionModel<ServerSession>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+ implements AMQSessionModel<ServerSession, ConsumerTarget_0_10>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
Deletable<ServerSession>
{
@@ -175,7 +175,7 @@ public class ServerSession extends Sessi
private final AtomicLong _txnCount = new AtomicLong(0);
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
@@ -578,11 +578,11 @@ public class ServerSession extends Sessi
}
- public void register(final MessageInstanceConsumer messageInstanceConsumer)
+ public void register(final MessageInstanceConsumer<ConsumerTarget_0_10> messageInstanceConsumer)
{
- if(messageInstanceConsumer instanceof Consumer<?>)
+ if(messageInstanceConsumer instanceof Consumer<?,?>)
{
- final Consumer<?> consumer = (Consumer<?>) messageInstanceConsumer;
+ final Consumer<?,ConsumerTarget_0_10> consumer = (Consumer<?,ConsumerTarget_0_10>) messageInstanceConsumer;
_consumers.add(consumer);
consumer.addChangeListener(_consumerClosedListener);
consumerAdded(consumer);
@@ -1098,7 +1098,7 @@ public class ServerSession extends Sessi
}
@Override
- public Collection<Consumer<?>> getConsumers()
+ public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
{
return Collections.unmodifiableCollection(_consumers);
@@ -1156,7 +1156,7 @@ public class ServerSession extends Sessi
}
}
- private void consumerAdded(Consumer<?> consumer)
+ private void consumerAdded(Consumer<?, ConsumerTarget_0_10> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
@@ -1164,7 +1164,7 @@ public class ServerSession extends Sessi
}
}
- private void consumerRemoved(Consumer<?> consumer)
+ private void consumerRemoved(Consumer<?, ConsumerTarget_0_10> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
@@ -1232,9 +1232,9 @@ public class ServerSession extends Sessi
}
@Override
- public void notifyWork(final ConsumerTarget target)
+ public void notifyWork(final ConsumerTarget_0_10 target)
{
- if(_consumersWithPendingWork.add((ConsumerTarget_0_10) target))
+ if(_consumersWithPendingWork.add(target))
{
getAMQPConnection().notifyWork(this);
}
@@ -1284,7 +1284,7 @@ public class ServerSession extends Sessi
{
if(newState == org.apache.qpid.server.model.State.DELETED)
{
- consumerRemoved((Consumer<?>)object);
+ consumerRemoved((Consumer<?, ConsumerTarget_0_10>)object);
}
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Dec 4 12:24:57 2016
@@ -107,7 +107,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.transport.network.Ticker;
public class AMQChannel
- implements AMQSessionModel<AMQChannel>,
+ implements AMQSessionModel<AMQChannel, ConsumerTarget_0_8>,
AsyncAutoCommitTransaction.FutureRecorder,
ServerChannelMethodProcessor,
EventLoggerProvider, CreditRestorer
@@ -206,7 +206,7 @@ public class AMQChannel
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
private final Subject _subject;
- private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_8>> _consumers = new CopyOnWriteArrayList<>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
@@ -323,7 +323,7 @@ public class AMQChannel
INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
}
- MessageInstanceConsumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+ MessageInstanceConsumer<ConsumerTarget_0_8> sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
target.updateNotifyWorkDesired();
target.sendNextMessage();
target.close();
@@ -675,11 +675,6 @@ public class AMQChannel
}
- public ConsumerTarget getSubscription(AMQShortString tag)
- {
- return _tag2SubscriptionTargetMap.get(tag);
- }
-
/**
* Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
* up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
@@ -818,15 +813,15 @@ public class AMQChannel
for(MessageSource source : sources)
{
- MessageInstanceConsumer sub =
+ MessageInstanceConsumer<ConsumerTarget_0_8> sub =
source.addConsumer(target,
filterManager,
AMQMessage.class,
AMQShortString.toString(tag),
options, priority);
- if (sub instanceof Consumer<?>)
+ if (sub instanceof Consumer<?, ?>)
{
- final Consumer<?> modelConsumer = (Consumer<?>) sub;
+ final Consumer<?,ConsumerTarget_0_8> modelConsumer = (Consumer<?,ConsumerTarget_0_8>) sub;
consumerAdded(modelConsumer);
modelConsumer.addChangeListener(_consumerClosedListener);
_consumers.add(modelConsumer);
@@ -865,7 +860,7 @@ public class AMQChannel
{
for(MessageInstanceConsumer sub : subs)
{
- if (sub instanceof Consumer<?>)
+ if (sub instanceof Consumer<?,?>)
{
_consumers.remove(sub);
}
@@ -1907,7 +1902,7 @@ public class AMQChannel
}
@Override
- public Collection<Consumer<?>> getConsumers()
+ public Collection<Consumer<?,ConsumerTarget_0_8>> getConsumers()
{
return Collections.unmodifiableCollection(_consumers);
}
@@ -1919,12 +1914,12 @@ public class AMQChannel
{
if(newState == State.DELETED)
{
- consumerRemoved((Consumer<?>)object);
+ consumerRemoved((Consumer<?,?>)object);
}
}
}
- private void consumerAdded(final Consumer<?> consumer)
+ private void consumerAdded(final Consumer<?,?> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
@@ -1932,7 +1927,7 @@ public class AMQChannel
}
}
- private void consumerRemoved(final Consumer<?> consumer)
+ private void consumerRemoved(final Consumer<?,?> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
@@ -3771,9 +3766,9 @@ public class AMQChannel
}
@Override
- public void notifyWork(final ConsumerTarget target)
+ public void notifyWork(final ConsumerTarget_0_8 target)
{
- if(_consumersWithPendingWork.add((ConsumerTarget_0_8) target))
+ if(_consumersWithPendingWork.add(target))
{
getAMQPConnection().notifyWork(this);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Sun Dec 4 12:24:57 2016
@@ -158,8 +158,8 @@ public class AMQPConnection_0_8Impl
private volatile boolean _transportBlockedForWriting;
private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
- private final Set<AMQSessionModel<?>> _sessionsWithWork =
- Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+ private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
public AMQPConnection_0_8Impl(Broker<?> broker,
@@ -741,7 +741,7 @@ public class AMQPConnection_0_8Impl
return String.valueOf(getNetwork().getRemoteAddress());
}
- public void closeSessionAsync(final AMQSessionModel<?> session, final CloseReason reason, final String message)
+ public void closeSessionAsync(final AMQSessionModel<?,?> session, final CloseReason reason, final String message)
{
final int cause;
switch (reason)
@@ -1355,7 +1355,7 @@ public class AMQPConnection_0_8Impl
}
@Override
- public void notifyWork(final AMQSessionModel<?> sessionModel)
+ public void notifyWork(final AMQSessionModel<?,?> sessionModel)
{
_sessionsWithWork.add(sessionModel);
notifyWork();
@@ -1385,7 +1385,7 @@ public class AMQPConnection_0_8Impl
private class ProcessPendingIterator implements Iterator<Runnable>
{
- private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+ private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
private ProcessPendingIterator()
{
@@ -1437,7 +1437,7 @@ public class AMQPConnection_0_8Impl
{
_sessionIterator = _sessionsWithWork.iterator();
}
- final AMQSessionModel<?> session = _sessionIterator.next();
+ final AMQSessionModel<?,?> session = _sessionIterator.next();
return new Runnable()
{
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sun Dec 4 12:24:57 2016
@@ -44,7 +44,7 @@ import org.apache.qpid.server.util.State
* Ties together the protocol session of a subscriber, the consumer tag
* that was given out by the broker and the channel id.
*/
-public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget
+public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<ConsumerTarget_0_8>
{
private final ClientDeliveryMethod _deliveryMethod;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Sun Dec 4 12:24:57 2016
@@ -236,8 +236,8 @@ public class AMQPConnection_1_0 extends
private boolean _closedOnOpen;
- private final Set<AMQSessionModel<?>> _sessionsWithWork =
- Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?>, Boolean>());
+ private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
AMQPConnection_1_0(final Broker<?> broker,
final ServerNetworkConnection network,
@@ -1357,7 +1357,7 @@ public class AMQPConnection_1_0 extends
}
@Override
- public void notifyWork(final AMQSessionModel<?> sessionModel)
+ public void notifyWork(final AMQSessionModel<?,?> sessionModel)
{
_sessionsWithWork.add(sessionModel);
notifyWork();
@@ -1409,7 +1409,7 @@ public class AMQPConnection_1_0 extends
addAsyncTask(action);
}
- public void closeSessionAsync(final AMQSessionModel<?> session,
+ public void closeSessionAsync(final AMQSessionModel<?,?> session,
final CloseReason reason, final String message)
{
final ErrorCondition cause;
@@ -1603,7 +1603,7 @@ public class AMQPConnection_1_0 extends
private class ProcessPendingIterator implements Iterator<Runnable>
{
- private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+ private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
private ProcessPendingIterator()
{
_sessionIterator = _sessionsWithWork.iterator();
@@ -1653,7 +1653,7 @@ public class AMQPConnection_1_0 extends
{
_sessionIterator = _sessionsWithWork.iterator();
}
- final AMQSessionModel<?> session = _sessionIterator.next();
+ final AMQSessionModel<?,?> session = _sessionIterator.next();
return new Runnable()
{
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Sun Dec 4 12:24:57 2016
@@ -55,7 +55,7 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-class ConsumerTarget_1_0 extends AbstractConsumerTarget
+class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
private final boolean _acquires;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sun Dec 4 12:24:57 2016
@@ -109,7 +109,7 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.network.Ticker;
-public class Session_1_0 implements AMQSessionModel<Session_1_0>, LogSubject
+public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
{
private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
@@ -131,7 +131,7 @@ public class Session_1_0 implements AMQS
private AtomicBoolean _closed = new AtomicBoolean();
private final Subject _subject = new Subject();
- private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
@@ -1085,9 +1085,9 @@ public class Session_1_0 implements AMQS
private void registerConsumer(final SendingLink_1_0 link)
{
MessageInstanceConsumer consumer = link.getConsumer();
- if(consumer instanceof Consumer<?>)
+ if(consumer instanceof Consumer<?,?>)
{
- Consumer<?> modelConsumer = (Consumer<?>) consumer;
+ Consumer<?,ConsumerTarget_1_0> modelConsumer = (Consumer<?,ConsumerTarget_1_0>) consumer;
_consumers.add(modelConsumer);
modelConsumer.addChangeListener(_consumerClosedListener);
consumerAdded(modelConsumer);
@@ -1552,7 +1552,7 @@ public class Session_1_0 implements AMQS
}
@Override
- public Collection<Consumer<?>> getConsumers()
+ public Collection<Consumer<?, ConsumerTarget_1_0>> getConsumers()
{
return Collections.unmodifiableCollection(_consumers);
}
@@ -1638,9 +1638,9 @@ public class Session_1_0 implements AMQS
}
@Override
- public void notifyWork(final ConsumerTarget target)
+ public void notifyWork(final ConsumerTarget_1_0 target)
{
- if(_consumersWithPendingWork.add((ConsumerTarget_1_0) target))
+ if(_consumersWithPendingWork.add(target))
{
getAMQPConnection().notifyWork(this);
}
@@ -1652,7 +1652,7 @@ public class Session_1_0 implements AMQS
getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
}
- private void consumerAdded(Consumer<?> consumer)
+ private void consumerAdded(Consumer<?, ConsumerTarget_1_0> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
@@ -1660,7 +1660,7 @@ public class Session_1_0 implements AMQS
}
}
- private void consumerRemoved(Consumer<?> consumer)
+ private void consumerRemoved(Consumer<?, ConsumerTarget_1_0> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
@@ -1675,7 +1675,7 @@ public class Session_1_0 implements AMQS
{
if(newState == org.apache.qpid.server.model.State.DELETED)
{
- consumerRemoved((Consumer<?>)object);
+ consumerRemoved((Consumer<?, ConsumerTarget_1_0>)object);
}
}
}
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Dec 4 12:24:57 2016
@@ -677,7 +677,7 @@ class ManagementNode implements MessageS
{
return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+ConfiguredObject.ID+"' cannot be set when creating an object");
}
- if(!attributes.containsKey(QPID_TYPE) && _model.getTypeRegistry().getCategory(clazz) != clazz)
+ if(!attributes.containsKey(QPID_TYPE) && ConfiguredObjectTypeRegistry.getCategory(clazz) != clazz)
{
Class<? extends ConfiguredObject> typeClass = _model.getTypeRegistry().getTypeClass(clazz);
String type = typeClass.getAnnotation(ManagedObject.class).type();
@@ -699,7 +699,7 @@ class ManagementNode implements MessageS
{
List<ConfiguredObject> parents =
- _configuredObjectFinder.findObjectParentsFromPath(Arrays.asList(getPathElements(path)), hierarchy, _model.getTypeRegistry().getCategory(clazz));
+ _configuredObjectFinder.findObjectParentsFromPath(Arrays.asList(getPathElements(path)), hierarchy, ConfiguredObjectTypeRegistry.getCategory(clazz));
if(parents.isEmpty())
{
return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "The '"+OBJECT_PATH+"' "+path+" does not identify a valid parent");
@@ -748,7 +748,7 @@ class ManagementNode implements MessageS
}
- final ConfiguredObject object = primaryParent.createChild(_model.getTypeRegistry().getCategory(clazz), attributes, otherParents);
+ final ConfiguredObject object = primaryParent.createChild(ConfiguredObjectTypeRegistry.getCategory(clazz), attributes, otherParents);
return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader,
_managementOutputConverter.convertToOutput(object, true));
}
@@ -1052,8 +1052,8 @@ class ManagementNode implements MessageS
private Map<?, ?> performQuery(final Map<String, Object> headerMap, final Map messageBody)
{
-
- List<Object> attributeNameObjects = _managementInputConverter.convert(List.class, messageBody.get(ATTRIBUTE_NAMES));
+ @SuppressWarnings("unchecked")
+ List<Object> attributeNameObjects = (List<Object>)_managementInputConverter.convert(List.class, messageBody.get(ATTRIBUTE_NAMES));
List<String> attributeNames;
if(attributeNameObjects == null)
{
@@ -1213,8 +1213,10 @@ class ManagementNode implements MessageS
{
if(ancestorCategories.contains(entry.getKey()))
{
- ConfiguredObjectOperation op = entry.getValue();
- for(ConfiguredObject<?> parent : (Collection<ConfiguredObject<?>>) op.perform(_managedObject, Collections.<String,Object>emptyMap()))
+ @SuppressWarnings("unchecked")
+ final Collection<ConfiguredObject<?>> parents = getAssociatedChildren(entry.getValue(), _managedObject);
+
+ for(ConfiguredObject<?> parent : parents)
{
foundObjects.addAll(getChildrenOfType(parent, type));
}
@@ -1241,8 +1243,11 @@ class ManagementNode implements MessageS
parents.add(_managedObject);
for(ConfiguredObjectOperation op : _associatedChildrenOperations.values())
{
- parents.addAll(
- (Collection<ConfiguredObject<?>>) op.perform(_managedObject, Collections.<String,Object>emptyMap()));
+
+ @SuppressWarnings("unchecked")
+ final Collection<ConfiguredObject<?>> associated = getAssociatedChildren(op, _managedObject);
+
+ parents.addAll(associated);
}
foundObjects.addAll(parents);
do
@@ -1263,6 +1268,13 @@ class ManagementNode implements MessageS
return foundObjects;
}
+ private static <C extends ConfiguredObject<?>> Collection<ConfiguredObject<?>> getAssociatedChildren(final ConfiguredObjectOperation<C> op, final ConfiguredObject<?> managedObject)
+ {
+ @SuppressWarnings("unchecked")
+ final Collection<ConfiguredObject<?>> associated = (Collection<ConfiguredObject<?>>) op.perform((C)managedObject, Collections.<String, Object>emptyMap());
+ return associated;
+ }
+
private List<String> generateAttributeNames(String entityType)
{
Set<String> attrNameSet = new HashSet<>();
@@ -1431,7 +1443,7 @@ class ManagementNode implements MessageS
@Override
- public synchronized ManagementNodeConsumer addConsumer(final ConsumerTarget target,
+ public synchronized <T extends ConsumerTarget<T>> ManagementNodeConsumer<T> addConsumer(final T target,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
@@ -1439,7 +1451,7 @@ class ManagementNode implements MessageS
final Integer priority)
{
- final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
+ final ManagementNodeConsumer<T> managementNodeConsumer = new ManagementNodeConsumer<>(consumerName,this, target);
target.consumerAdded(managementNodeConsumer);
_consumers.add(managementNodeConsumer);
return managementNodeConsumer;
@@ -1452,7 +1464,7 @@ class ManagementNode implements MessageS
}
@Override
- public boolean verifySessionAccess(final AMQSessionModel<?> session)
+ public boolean verifySessionAccess(final AMQSessionModel<?,?> session)
{
return true;
}
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Sun Dec 4 12:24:57 2016
@@ -41,16 +41,16 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-class ManagementNodeConsumer implements MessageInstanceConsumer, MessageDestination
+class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, MessageDestination
{
private final ManagementNode _managementNode;
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
- private final ConsumerTarget _target;
+ private final T _target;
private final String _name;
private final Object _identifier = new Object();
- public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
+ public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, T target)
{
_name = consumerName;
_managementNode = managementNode;
@@ -151,7 +151,7 @@ class ManagementNodeConsumer implements
}
@Override
- public ConsumerTarget getTarget()
+ public T getTarget()
{
return _target;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org